This is an automated email from the ASF dual-hosted git repository. guangmingchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push: new c7add74f Support ProtoJson formatted http body (#2921) c7add74f is described below commit c7add74fab5c65d80f8d126fe0a51190ca070fac Author: Bright Chen <chenguangmin...@foxmail.com> AuthorDate: Sun Mar 30 15:02:26 2025 +0800 Support ProtoJson formatted http body (#2921) --- src/brpc/extension.h | 2 +- src/brpc/policy/http_rpc_protocol.cpp | 171 ++++++++++++++++++--------- src/brpc/policy/http_rpc_protocol.h | 1 + src/bthread/key.cpp | 5 +- src/butil/logging.cc | 2 +- src/butil/memory/singleton_on_pthread_once.h | 10 +- src/json2pb/json_to_pb.cpp | 48 ++++++-- src/json2pb/json_to_pb.h | 29 +++-- src/json2pb/pb_to_json.cpp | 41 ++++++- src/json2pb/pb_to_json.h | 15 ++- src/json2pb/protobuf_type_resolver.cpp | 36 ++++++ src/json2pb/protobuf_type_resolver.h | 71 +++++++++++ test/addressbook_map.proto | 10 ++ test/brpc_http_rpc_protocol_unittest.cpp | 74 +++++++++++- test/brpc_protobuf_json_unittest.cpp | 103 ++++++++++++++++ test/bthread_rwlock_unittest.cpp | 10 +- 16 files changed, 536 insertions(+), 92 deletions(-) diff --git a/src/brpc/extension.h b/src/brpc/extension.h index 7190ac91..7854362e 100644 --- a/src/brpc/extension.h +++ b/src/brpc/extension.h @@ -48,7 +48,7 @@ public: void List(std::ostream& os, char separator); private: -friend class butil::GetLeakySingleton<Extension<T> >; +template <typename U> friend U* butil::create_leaky_singleton_obj(); Extension() = default; butil::CaseIgnoredFlatMap<T*> _instance_map; butil::Mutex _map_mutex; diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index a1353b14..e0ff1e31 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -19,27 +19,26 @@ #include <google/protobuf/descriptor.h> // MethodDescriptor #include <google/protobuf/text_format.h> #include <gflags/gflags.h> -#include <json2pb/pb_to_json.h> // ProtoMessageToJson -#include <json2pb/json_to_pb.h> // JsonToProtoMessage #include <string> - #include "brpc/policy/http_rpc_protocol.h" #include "butil/unique_ptr.h" // std::unique_ptr #include "butil/string_splitter.h" // StringMultiSplitter #include "butil/string_printf.h" #include "butil/time.h" #include "butil/sys_byteorder.h" +#include "json2pb/pb_to_json.h" // ProtoMessageToJson +#include "json2pb/json_to_pb.h" // JsonToProtoMessage #include "brpc/compress.h" -#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD -#include "brpc/controller.h" // Controller -#include "brpc/server.h" // Server +#include "brpc/errno.pb.h" // ENOSERVICE, ENOMETHOD +#include "brpc/controller.h" // Controller +#include "brpc/server.h" // Server #include "brpc/details/server_private_accessor.h" #include "brpc/span.h" -#include "brpc/socket.h" // Socket -#include "brpc/rpc_dump.h" // SampledRequest -#include "brpc/http_status_code.h" // HTTP_STATUS_* +#include "brpc/socket.h" // Socket +#include "brpc/rpc_dump.h" // SampledRequest +#include "brpc/http_status_code.h" // HTTP_STATUS_* #include "brpc/details/controller_private_accessor.h" -#include "brpc/builtin/index_service.h" // IndexService +#include "brpc/builtin/index_service.h" // IndexService #include "brpc/policy/gzip_compress.h" #include "brpc/policy/http2_rpc_protocol.h" #include "brpc/details/usercode_backup_pool.h" @@ -203,6 +202,9 @@ HttpContentType ParseContentType(butil::StringPiece ct, bool* is_grpc_ct) { if (ct.starts_with("json")) { type = HTTP_CONTENT_JSON; ct.remove_prefix(4); + } else if (ct.starts_with("proto-json")) { + type = HTTP_CONTENT_PROTO_JSON; + ct.remove_prefix(10); } else if (ct.starts_with("proto-text")) { type = HTTP_CONTENT_PROTO_TEXT; ct.remove_prefix(10); @@ -271,6 +273,79 @@ static bool RemoveGrpcPrefix(butil::IOBuf* body, bool* compressed) { return (message_length + 5 == sz); } +static bool JsonToProtoMessage(const butil::IOBuf& body, + google::protobuf::Message* message, + Controller* cntl, int error_code) { + butil::IOBufAsZeroCopyInputStream wrapper(body); + json2pb::Json2PbOptions options; + options.base64_to_bytes = cntl->has_pb_bytes_to_base64(); + options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array(); + std::string error; + bool ok = json2pb::JsonToProtoMessage(&wrapper, message, options, &error); + if (!ok) { + cntl->SetFailed(error_code, "Fail to parse http json body as %s: %s", + message->GetDescriptor()->full_name().c_str(), + error.c_str()); + } + return ok; +} + +static bool ProtoMessageToJson(const google::protobuf::Message& message, + butil::IOBufAsZeroCopyOutputStream* wrapper, + Controller* cntl, int error_code) { + json2pb::Pb2JsonOptions options; + options.bytes_to_base64 = cntl->has_pb_bytes_to_base64(); + options.jsonify_empty_array = cntl->has_pb_jsonify_empty_array(); + options.always_print_primitive_fields = cntl->has_always_print_primitive_fields(); + options.single_repeated_to_array = cntl->has_pb_single_repeated_to_array(); + options.enum_option = FLAGS_pb_enum_as_number + ? json2pb::OUTPUT_ENUM_BY_NUMBER + : json2pb::OUTPUT_ENUM_BY_NAME; + std::string error; + bool ok = json2pb::ProtoMessageToJson(message, wrapper, options, &error); + if (!ok) { + cntl->SetFailed(error_code, "Fail to convert %s to json: %s", + message.GetDescriptor()->full_name().c_str(), + error.c_str()); + } + return ok; +} + +static bool ProtoJsonToProtoMessage(const butil::IOBuf& body, + google::protobuf::Message* message, + Controller* cntl, int error_code) { + json2pb::ProtoJson2PbOptions options; + options.ignore_unknown_fields = true; + butil::IOBufAsZeroCopyInputStream wrapper(body); + std::string error; + bool ok = json2pb::ProtoJsonToProtoMessage(&wrapper, message, options, &error); + if (!ok) { + cntl->SetFailed(error_code, "Fail to parse http proto-json body as %s: %s", + message->GetDescriptor()->full_name().c_str(), + error.c_str()); + } + return ok; +} + +static bool ProtoMessageToProtoJson(const google::protobuf::Message& message, + butil::IOBufAsZeroCopyOutputStream* wrapper, + Controller* cntl, int error_code) { + json2pb::Pb2ProtoJsonOptions options; +#if GOOGLE_PROTOBUF_VERSION >= 5026002 + options.always_print_fields_with_no_presence = cntl->has_always_print_primitive_fields(); +#else + options.always_print_primitive_fields = cntl->has_always_print_primitive_fields(); +#endif + options.always_print_enums_as_ints = FLAGS_pb_enum_as_number; + std::string error; + bool ok = json2pb::ProtoMessageToProtoJson(message, wrapper, options, &error); + if (!ok) { + cntl->SetFailed(error_code, "Fail to convert %s to proto-json: %s", + message.GetDescriptor()->full_name().c_str(), error.c_str()); + } + return ok; +} + void ProcessHttpResponse(InputMessageBase* msg) { const int64_t start_parse_us = butil::cpuwide_time_us(); DestroyingPtr<HttpContext> imsg_guard(static_cast<HttpContext*>(msg)); @@ -435,8 +510,8 @@ void ProcessHttpResponse(InputMessageBase* msg) { if (grpc_compressed) { encoding = res_header->GetHeader(common->GRPC_ENCODING); if (encoding == NULL) { - cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding'" - " in compressed gRPC response"); + cntl->SetFailed(ERESPONSE, "Fail to find header `grpc-encoding' " + "in compressed gRPC response"); break; } } @@ -455,23 +530,24 @@ void ProcessHttpResponse(InputMessageBase* msg) { } if (content_type == HTTP_CONTENT_PROTO) { if (!ParsePbFromIOBuf(cntl->response(), res_body)) { - cntl->SetFailed(ERESPONSE, "Fail to parse content"); + cntl->SetFailed(ERESPONSE, "Fail to parse content as %s", + cntl->response()->GetDescriptor()->full_name().c_str()); break; } } else if (content_type == HTTP_CONTENT_PROTO_TEXT) { if (!ParsePbTextFromIOBuf(cntl->response(), res_body)) { - cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content"); + cntl->SetFailed(ERESPONSE, "Fail to parse proto-text content as %s", + cntl->response()->GetDescriptor()->full_name().c_str()); break; } } else if (content_type == HTTP_CONTENT_JSON) { - // message body is json - butil::IOBufAsZeroCopyInputStream wrapper(res_body); - std::string err; - json2pb::Json2PbOptions options; - options.base64_to_bytes = cntl->has_pb_bytes_to_base64(); - options.array_to_single_repeated = cntl->has_pb_single_repeated_to_array(); - if (!json2pb::JsonToProtoMessage(&wrapper, cntl->response(), options, &err)) { - cntl->SetFailed(ERESPONSE, "Fail to parse content, %s", err.c_str()); + // Message body is json. + if (!JsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) { + break; + } + } else if (content_type == HTTP_CONTENT_PROTO_JSON) { + // Message body is json. + if (!ProtoJsonToProtoMessage(res_body, cntl->response(), cntl, ERESPONSE)) { break; } } else { @@ -530,8 +606,7 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, } } else { bool is_grpc_ct = false; - content_type = ParseContentType(hreq.content_type(), - &is_grpc_ct); + content_type = ParseContentType(hreq.content_type(), &is_grpc_ct); is_grpc = (is_http2 && is_grpc_ct); } @@ -549,21 +624,15 @@ void SerializeHttpRequest(butil::IOBuf* /*not used*/, return cntl->SetFailed(EREQUEST, "Fail to print %s as proto-text", pbreq->GetTypeName().c_str()); } + } else if (content_type == HTTP_CONTENT_PROTO_JSON) { + if (!ProtoMessageToProtoJson(*pbreq, &wrapper, cntl, EREQUEST)) { + cntl->request_attachment().clear(); + return; + } } else if (content_type == HTTP_CONTENT_JSON) { - std::string err; - json2pb::Pb2JsonOptions opt; - opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64(); - opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array(); - opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields(); - opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array(); - - opt.enum_option = (FLAGS_pb_enum_as_number - ? json2pb::OUTPUT_ENUM_BY_NUMBER - : json2pb::OUTPUT_ENUM_BY_NAME); - if (!json2pb::ProtoMessageToJson(*pbreq, &wrapper, opt, &err)) { + if (!ProtoMessageToJson(*pbreq, &wrapper, cntl, EREQUEST)) { cntl->request_attachment().clear(); - return cntl->SetFailed( - EREQUEST, "Fail to convert request to json, %s", err.c_str()); + return; } } else { return cntl->SetFailed( @@ -819,19 +888,10 @@ HttpResponseSender::~HttpResponseSender() { if (!google::protobuf::TextFormat::Print(*res, &wrapper)) { cntl->SetFailed(ERESPONSE, "Fail to print %s as proto-text", res->GetTypeName().c_str()); } + } else if (content_type == HTTP_CONTENT_PROTO_JSON) { + ProtoMessageToProtoJson(*res, &wrapper, cntl, ERESPONSE); } else { - std::string err; - json2pb::Pb2JsonOptions opt; - opt.bytes_to_base64 = cntl->has_pb_bytes_to_base64(); - opt.jsonify_empty_array = cntl->has_pb_jsonify_empty_array(); - opt.always_print_primitive_fields = cntl->has_always_print_primitive_fields(); - opt.single_repeated_to_array = cntl->has_pb_single_repeated_to_array(); - opt.enum_option = (FLAGS_pb_enum_as_number - ? json2pb::OUTPUT_ENUM_BY_NUMBER - : json2pb::OUTPUT_ENUM_BY_NAME); - if (!json2pb::ProtoMessageToJson(*res, &wrapper, opt, &err)) { - cntl->SetFailed(ERESPONSE, "Fail to convert response to json, %s", err.c_str()); - } + ProtoMessageToJson(*res, &wrapper, cntl, ERESPONSE); } } @@ -1610,17 +1670,14 @@ void ProcessHttpRequest(InputMessageBase *msg) { req->GetDescriptor()->full_name().c_str()); return; } + } else if (content_type == HTTP_CONTENT_PROTO_JSON) { + if (!ProtoJsonToProtoMessage(req_body, req, cntl, EREQUEST)) { + return; + } } else { - butil::IOBufAsZeroCopyInputStream wrapper(req_body); - std::string err; - json2pb::Json2PbOptions options; - options.base64_to_bytes = mp->params.pb_bytes_to_base64; - options.array_to_single_repeated = mp->params.pb_single_repeated_to_array; cntl->set_pb_bytes_to_base64(mp->params.pb_bytes_to_base64); cntl->set_pb_single_repeated_to_array(mp->params.pb_single_repeated_to_array); - if (!json2pb::JsonToProtoMessage(&wrapper, req, options, &err)) { - cntl->SetFailed(EREQUEST, "Fail to parse http body as %s, %s", - req->GetDescriptor()->full_name().c_str(), err.c_str()); + if (!JsonToProtoMessage(req_body, req, cntl, EREQUEST)) { return; } } diff --git a/src/brpc/policy/http_rpc_protocol.h b/src/brpc/policy/http_rpc_protocol.h index 918e69d0..bc8bd065 100644 --- a/src/brpc/policy/http_rpc_protocol.h +++ b/src/brpc/policy/http_rpc_protocol.h @@ -149,6 +149,7 @@ enum HttpContentType { HTTP_CONTENT_JSON = 1, HTTP_CONTENT_PROTO = 2, HTTP_CONTENT_PROTO_TEXT = 3, + HTTP_CONTENT_PROTO_JSON = 4, }; // Parse from the textual content type. One type may have more than one literals. diff --git a/src/bthread/key.cpp b/src/bthread/key.cpp index 1bf5ec89..00215d7f 100644 --- a/src/bthread/key.cpp +++ b/src/bthread/key.cpp @@ -222,8 +222,7 @@ private: class BAIDU_CACHELINE_ALIGNMENT KeyTableList { public: KeyTableList() : - _head(NULL), _tail(NULL), _length(0) { - } + _head(NULL), _tail(NULL), _length(0) {} ~KeyTableList() { TaskGroup* g = BAIDU_GET_VOLATILE_THREAD_LOCAL(tls_task_group); @@ -305,7 +304,7 @@ public: return count; } - inline uint32_t get_length() { + inline uint32_t get_length() const { return _length; } diff --git a/src/butil/logging.cc b/src/butil/logging.cc index 9b09af6d..2f759f6a 100644 --- a/src/butil/logging.cc +++ b/src/butil/logging.cc @@ -395,7 +395,7 @@ bool InitializeLogFileHandle() { #elif defined(OS_POSIX) log_file = fopen(log_file_name->c_str(), "a"); if (log_file == NULL) { - fprintf(stderr, "Fail to fopen %s", log_file_name->c_str()); + fprintf(stderr, "Fail to fopen %s: %s", log_file_name->c_str(), berror()); return false; } #endif diff --git a/src/butil/memory/singleton_on_pthread_once.h b/src/butil/memory/singleton_on_pthread_once.h index 378d708b..9699bba7 100644 --- a/src/butil/memory/singleton_on_pthread_once.h +++ b/src/butil/memory/singleton_on_pthread_once.h @@ -25,7 +25,13 @@ namespace butil { -template <typename T> class GetLeakySingleton { +template <typename T> +T* create_leaky_singleton_obj() { + return new T(); +} + +template <typename T> +class GetLeakySingleton { public: static butil::subtle::AtomicWord g_leaky_singleton_untyped; static pthread_once_t g_create_leaky_singleton_once; @@ -39,7 +45,7 @@ pthread_once_t GetLeakySingleton<T>::g_create_leaky_singleton_once = PTHREAD_ONC template <typename T> void GetLeakySingleton<T>::create_leaky_singleton() { - T* obj = new T; + T* obj = create_leaky_singleton_obj<T>(); butil::subtle::Release_Store( &g_leaky_singleton_untyped, reinterpret_cast<butil::subtle::AtomicWord>(obj)); diff --git a/src/json2pb/json_to_pb.cpp b/src/json2pb/json_to_pb.cpp index f942253e..53887a38 100644 --- a/src/json2pb/json_to_pb.cpp +++ b/src/json2pb/json_to_pb.cpp @@ -27,14 +27,14 @@ #include "butil/strings/string_number_conversions.h" #include "butil/third_party/rapidjson/error/error.h" #include "butil/third_party/rapidjson/rapidjson.h" -#include "json_to_pb.h" -#include "zero_copy_stream_reader.h" // ZeroCopyStreamReader -#include "encode_decode.h" +#include "json2pb/json_to_pb.h" +#include "json2pb/zero_copy_stream_reader.h" // ZeroCopyStreamReader +#include "json2pb/encode_decode.h" +#include "json2pb/protobuf_map.h" +#include "json2pb/rapidjson.h" +#include "json2pb/protobuf_type_resolver.h" #include "butil/base64.h" -#include "butil/string_printf.h" -#include "protobuf_map.h" -#include "rapidjson.h" - +#include "butil/iobuf.h" #ifdef __GNUC__ // Ignore -Wnonnull for `(::google::protobuf::Message*)nullptr' of J2PERROR by design. @@ -712,6 +712,40 @@ bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *stream, std::string* error) { return JsonToProtoMessage(stream, message, Json2PbOptions(), error, nullptr); } + +bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, + google::protobuf::Message* message, + const ProtoJson2PbOptions& options, + std::string* error) { + TypeResolverUniqueptr type_resolver = GetTypeResolver(*message); + butil::IOBuf buf; + butil::IOBufAsZeroCopyOutputStream output_stream(&buf); + std::string type_url = GetTypeUrl(*message); + auto st = google::protobuf::util::JsonToBinaryStream( + type_resolver.get(), type_url, json, &output_stream, options); + + butil::IOBufAsZeroCopyInputStream input_stream(buf); + google::protobuf::io::CodedInputStream decoder(&input_stream); + if (!st.ok()) { + if (NULL != error) { + *error = st.ToString(); + } + return false; + } + + bool ok = message->ParseFromCodedStream(&decoder); + if (!ok && NULL != error) { + *error = "Fail to ParseFromCodedStream"; + } + return ok; +} + +bool ProtoJsonToProtoMessage(const std::string& json, google::protobuf::Message* message, + const ProtoJson2PbOptions& options, std::string* error) { + google::protobuf::io::ArrayInputStream input_stream(json.data(), json.size()); + return ProtoJsonToProtoMessage(&input_stream, message, options, error); +} + } //namespace json2pb #undef J2PERROR diff --git a/src/json2pb/json_to_pb.h b/src/json2pb/json_to_pb.h index 44203e08..78eb15b6 100644 --- a/src/json2pb/json_to_pb.h +++ b/src/json2pb/json_to_pb.h @@ -23,6 +23,7 @@ #include "json2pb/zero_copy_stream_reader.h" #include <google/protobuf/message.h> #include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyInputStream +#include <google/protobuf/util/json_util.h> namespace json2pb { @@ -43,7 +44,7 @@ struct Json2PbOptions { bool allow_remaining_bytes_after_parsing; }; -// Convert `json' to protobuf `message'. +// Convert `json' to protobuf `message' according to `options'. // Returns true on success. `error' (if not NULL) will be set with error // message on failure. // @@ -58,18 +59,18 @@ bool JsonToProtoMessage(const std::string& json, size_t* parsed_offset = nullptr); // Use ZeroCopyInputStream as input instead of std::string. -bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream *json, - google::protobuf::Message *message, - const Json2PbOptions &options, - std::string *error = nullptr, - size_t *parsed_offset = nullptr); +bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, + google::protobuf::Message* message, + const Json2PbOptions& options, + std::string* error = nullptr, + size_t* parsed_offset = nullptr); // Use ZeroCopyStreamReader as input instead of std::string. // If you need to parse multiple jsons from IOBuf, you should use this // overload instead of the ZeroCopyInputStream one which bases on this // and recreates a ZeroCopyStreamReader internally that can't be reused // between continuous calls. -bool JsonToProtoMessage(ZeroCopyStreamReader *json, +bool JsonToProtoMessage(ZeroCopyStreamReader* json, google::protobuf::Message* message, const Json2PbOptions& options, std::string* error = nullptr, @@ -83,6 +84,20 @@ bool JsonToProtoMessage(const std::string& json, bool JsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* stream, google::protobuf::Message* message, std::string* error = nullptr); + +// See <google/protobuf/util/json_util.h> for details. +using ProtoJson2PbOptions = google::protobuf::util::JsonParseOptions; + +// Convert ProtoJSON formatted `json' to protobuf `message' according to `options'. +// See https://protobuf.dev/programming-guides/json/ for details. +bool ProtoJsonToProtoMessage(google::protobuf::io::ZeroCopyInputStream* json, + google::protobuf::Message* message, + const ProtoJson2PbOptions& options, + std::string* error = NULL); +// Use default GoogleJson2PbOptions. +bool ProtoJsonToProtoMessage(const std::string& json, google::protobuf::Message* message, + const ProtoJson2PbOptions& options, std::string* error = NULL); + } // namespace json2pb #endif // BRPC_JSON2PB_JSON_TO_PB_H diff --git a/src/json2pb/pb_to_json.cpp b/src/json2pb/pb_to_json.cpp index b0066dc6..0dc94814 100644 --- a/src/json2pb/pb_to_json.cpp +++ b/src/json2pb/pb_to_json.cpp @@ -22,12 +22,14 @@ #include <sys/time.h> #include <time.h> #include <google/protobuf/descriptor.h> +#include "json2pb/zero_copy_stream_writer.h" +#include "json2pb/encode_decode.h" +#include "json2pb/protobuf_map.h" +#include "json2pb/rapidjson.h" +#include "json2pb/pb_to_json.h" +#include "json2pb/protobuf_type_resolver.h" +#include "butil/iobuf.h" #include "butil/base64.h" -#include "zero_copy_stream_writer.h" -#include "encode_decode.h" -#include "protobuf_map.h" -#include "rapidjson.h" -#include "pb_to_json.h" namespace json2pb { Pb2JsonOptions::Pb2JsonOptions() @@ -345,4 +347,33 @@ bool ProtoMessageToJson(const google::protobuf::Message& message, std::string* error) { return ProtoMessageToJson(message, stream, Pb2JsonOptions(), error); } + +bool ProtoMessageToProtoJson(const google::protobuf::Message& message, + google::protobuf::io::ZeroCopyOutputStream* json, + const Pb2ProtoJsonOptions& options, std::string* error) { + TypeResolverUniqueptr type_resolver = GetTypeResolver(message); + butil::IOBuf buf; + butil::IOBufAsZeroCopyOutputStream output_stream(&buf); + google::protobuf::io::CodedOutputStream coded_stream(&output_stream); + if (!message.SerializeToCodedStream(&coded_stream)) { + return false; + } + + butil::IOBufAsZeroCopyInputStream input_stream(buf); + auto st = google::protobuf::util::BinaryToJsonStream( + type_resolver.get(), GetTypeUrl(message), &input_stream, json, options); + + bool ok = st.ok(); + if (!ok && NULL != error) { + *error = st.ToString(); + } + return ok; +} + +bool ProtoMessageToProtoJson(const google::protobuf::Message& message, std::string* json, + const Pb2ProtoJsonOptions& options, std::string* error) { + google::protobuf::io::StringOutputStream output_stream(json); + return ProtoMessageToProtoJson(message, &output_stream, options, error); +} + } // namespace json2pb diff --git a/src/json2pb/pb_to_json.h b/src/json2pb/pb_to_json.h index 97057d0f..33311ffb 100644 --- a/src/json2pb/pb_to_json.h +++ b/src/json2pb/pb_to_json.h @@ -23,6 +23,7 @@ #include <string> #include <google/protobuf/message.h> #include <google/protobuf/io/zero_copy_stream.h> // ZeroCopyOutputStream +#include <google/protobuf/util/json_util.h> namespace json2pb { @@ -79,7 +80,7 @@ bool ProtoMessageToJson(const google::protobuf::Message& message, std::string* error = NULL); // send output to ZeroCopyOutputStream instead of std::string. bool ProtoMessageToJson(const google::protobuf::Message& message, - google::protobuf::io::ZeroCopyOutputStream *json, + google::protobuf::io::ZeroCopyOutputStream* json, const Pb2JsonOptions& options, std::string* error = NULL); @@ -90,6 +91,18 @@ bool ProtoMessageToJson(const google::protobuf::Message& message, bool ProtoMessageToJson(const google::protobuf::Message& message, google::protobuf::io::ZeroCopyOutputStream* json, std::string* error = NULL); + +// See <google/protobuf/util/json_util.h> for details. +using Pb2ProtoJsonOptions = google::protobuf::util::JsonOptions; + +// Convert protobuf `messge' to `json' in ProtoJSON format according to `options'. +// See https://protobuf.dev/programming-guides/json/ for details. +bool ProtoMessageToProtoJson(const google::protobuf::Message& message, + google::protobuf::io::ZeroCopyOutputStream* json, + const Pb2ProtoJsonOptions& options, std::string* error = NULL); +// Using default GooglePb2JsonOptions. +bool ProtoMessageToProtoJson(const google::protobuf::Message& message, std::string* json, + const Pb2ProtoJsonOptions& options, std::string* error = NULL); } // namespace json2pb #endif // BRPC_JSON2PB_PB_TO_JSON_H diff --git a/src/json2pb/protobuf_type_resolver.cpp b/src/json2pb/protobuf_type_resolver.cpp new file mode 100644 index 00000000..a75974fc --- /dev/null +++ b/src/json2pb/protobuf_type_resolver.cpp @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "json2pb/protobuf_type_resolver.h" + +namespace json2pb { + +using google::protobuf::DescriptorPool; +using google::protobuf::util::TypeResolver; +using google::protobuf::util::NewTypeResolverForDescriptorPool; + +TypeResolverUniqueptr GetTypeResolver(const google::protobuf::Message& message) { + auto pool = message.GetDescriptor()->file()->pool(); + bool is_generated_pool = pool == DescriptorPool::generated_pool(); + TypeResolver* resolver = is_generated_pool + ? butil::get_leaky_singleton<TypeResolver>() + : NewTypeResolverForDescriptorPool(PROTOBUF_TYPE_URL_PREFIX, pool); + return { resolver, TypeResolverDeleter(is_generated_pool) }; +} + +} // namespace json2pb + diff --git a/src/json2pb/protobuf_type_resolver.h b/src/json2pb/protobuf_type_resolver.h new file mode 100644 index 00000000..18993f18 --- /dev/null +++ b/src/json2pb/protobuf_type_resolver.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_PROTOBUF_TYPE_RESOLVER_H +#define BRPC_PROTOBUF_TYPE_RESOLVER_H + +#include <string> +#include <memory> +#include <google/protobuf/message.h> +#include <google/protobuf/util/type_resolver.h> +#include <google/protobuf/util/type_resolver_util.h> +#include "butil/string_printf.h" +#include "butil/memory/singleton_on_pthread_once.h" + +namespace json2pb { + +#define PROTOBUF_TYPE_URL_PREFIX "type.googleapis.com" + +inline std::string GetTypeUrl(const google::protobuf::Message& message) { + return butil::string_printf(PROTOBUF_TYPE_URL_PREFIX"/%s", + message.GetDescriptor()->full_name().c_str()); +} + +class TypeResolverDeleter { +public: + explicit TypeResolverDeleter(bool is_generated_pool) + : _is_generated_pool(is_generated_pool) {} + + void operator()(google::protobuf::util::TypeResolver* resolver) const { + if (!_is_generated_pool) { + delete resolver; + } + } +private: + bool _is_generated_pool; +}; + +using TypeResolverUniqueptr = std::unique_ptr< + google::protobuf::util::TypeResolver, TypeResolverDeleter>; + +TypeResolverUniqueptr GetTypeResolver(const google::protobuf::Message& message); + +} // namespace json2pb + +namespace butil { + +// Customized singleton object creation for google::protobuf::util::TypeResolver. +template<> +inline google::protobuf::util::TypeResolver* +create_leaky_singleton_obj<google::protobuf::util::TypeResolver>() { + return google::protobuf::util::NewTypeResolverForDescriptorPool( + PROTOBUF_TYPE_URL_PREFIX, google::protobuf::DescriptorPool::generated_pool()); +} + +} // namespace butil + +#endif // BRPC_PROTOBUF_TYPE_RESOLVER_H diff --git a/test/addressbook_map.proto b/test/addressbook_map.proto index d3b15466..4d18e68c 100644 --- a/test/addressbook_map.proto +++ b/test/addressbook_map.proto @@ -52,6 +52,16 @@ message AddressComplex { repeated FriendEntry friends = 2; } +message AddressIntMapStd { + required string addr = 1; + map<string, int32> numbers = 2; +} + +message AddressStringMapStd { + required string addr = 1; + map<string, string> contacts = 2; +} + message haha { repeated int32 a = 1; } \ No newline at end of file diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index ad631a2e..578e8f89 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -189,6 +189,20 @@ protected: return msg; } + brpc::policy::HttpContext* MakePostJsonStdRequestMessage(const std::string& path) { + brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false); + msg->header().uri().set_path(path); + msg->header().set_content_type("application/proto-json"); + msg->header().set_method(brpc::HTTP_METHOD_POST); + + test::EchoRequest req; + req.set_message(EXP_REQUEST); + butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body()); + json2pb::Pb2ProtoJsonOptions options; + EXPECT_TRUE(json2pb::ProtoMessageToProtoJson(req, &req_stream, options)); + return msg; + } + brpc::policy::HttpContext* MakePostProtoTextRequestMessage( const std::string& path) { brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false); @@ -334,7 +348,7 @@ TEST_F(HttpTest, parse_http_address) { TEST_F(HttpTest, verify_request) { { brpc::policy::HttpContext* msg = - MakePostRequestMessage("/EchoService/Echo"); + MakePostRequestMessage("/EchoService/Echo"); VerifyMessage(msg, false); msg->Destroy(); } @@ -350,6 +364,12 @@ TEST_F(HttpTest, verify_request) { VerifyMessage(msg, false); msg->Destroy(); } + { + brpc::policy::HttpContext* msg = + MakePostJsonStdRequestMessage("/EchoService/Echo"); + VerifyMessage(msg, false); + msg->Destroy(); + } { brpc::policy::HttpContext* msg = MakePostProtoTextRequestMessage("/EchoService/Echo"); @@ -1677,7 +1697,6 @@ TEST_F(HttpTest, spring_protobuf_content_type) { brpc::Controller cntl2; test::EchoService_Stub stub(&channel); - req.set_message(EXP_REQUEST); res.Clear(); cntl2.http_request().set_content_type("application/x-protobuf"); stub.Echo(&cntl2, &req, &res, nullptr); @@ -1770,7 +1789,7 @@ TEST_F(HttpTest, dump_http_request) { brpc::g_rpc_dump_sl.sampling_range = 0; } -TEST_F(HttpTest, spring_protobuf_text_content_type) { +TEST_F(HttpTest, proto_text_content_type) { const int port = 8923; brpc::Server server; EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE)); @@ -1795,6 +1814,55 @@ TEST_F(HttpTest, spring_protobuf_text_content_type) { ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString( cntl.response_attachment().to_string(), &res)); ASSERT_EQ(EXP_RESPONSE, res.message()); + + test::EchoService_Stub stub(&channel); + cntl.Reset(); + cntl.http_request().set_content_type("application/proto-text"); + res.Clear(); + stub.Echo(&cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ(EXP_RESPONSE, res.message()); + ASSERT_EQ("application/proto-text", cntl.http_response().content_type()); +} + +TEST_F(HttpTest, proto_json_content_type) { + const int port = 8923; + brpc::Server server; + EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE)); + EXPECT_EQ(0, server.Start(port, nullptr)); + + brpc::Channel channel; + brpc::ChannelOptions options; + options.protocol = "http"; + ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options)); + + brpc::Controller cntl; + test::EchoRequest req; + test::EchoResponse res; + req.set_message(EXP_REQUEST); + cntl.http_request().set_method(brpc::HTTP_METHOD_POST); + cntl.http_request().uri() = "/EchoService/Echo"; + cntl.http_request().set_content_type("application/proto-json"); + json2pb::Pb2ProtoJsonOptions json_options; + butil::IOBufAsZeroCopyOutputStream output_stream(&cntl.request_attachment()); + ASSERT_TRUE(json2pb::ProtoMessageToProtoJson(req, &output_stream, json_options)); + channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ("application/proto-json", cntl.http_response().content_type()); + json2pb::ProtoJson2PbOptions parse_options; + parse_options.ignore_unknown_fields = true; + butil::IOBufAsZeroCopyInputStream input_stream(cntl.response_attachment()); + ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream, &res, parse_options)); + ASSERT_EQ(EXP_RESPONSE, res.message()); + + test::EchoService_Stub stub(&channel); + cntl.Reset(); + cntl.http_request().set_content_type("application/proto-json"); + res.Clear(); + stub.Echo(&cntl, &req, &res, nullptr); + ASSERT_FALSE(cntl.Failed()); + ASSERT_EQ(EXP_RESPONSE, res.message()); + ASSERT_EQ("application/proto-json", cntl.http_response().content_type()); } class HttpServiceImpl : public ::test::HttpService { diff --git a/test/brpc_protobuf_json_unittest.cpp b/test/brpc_protobuf_json_unittest.cpp index 5bb158d7..e8435d2c 100644 --- a/test/brpc_protobuf_json_unittest.cpp +++ b/test/brpc_protobuf_json_unittest.cpp @@ -1640,4 +1640,107 @@ TEST_F(ProtobufJsonTest, parse_multiple_json_error) { ASSERT_EQ(47ul, offset); } +TEST_F(ProtobufJsonTest, proto_json_to_pb) { + std::string error; + json2pb::ProtoJson2PbOptions options; + + std::string json1 = R"({"addr":"baidu.com",)" + R"("numbers":[{"key":"tel","value":123456},{"key":"cell","value":654321}]})"; + AddressIntMapStd aims; + ASSERT_FALSE(json2pb::ProtoJsonToProtoMessage(json1, &aims, options, &error)); + LOG(INFO) << "Fail to ProtoJsonToProtoMessage: " << error; + + error.clear(); + butil::IOBuf json_buf1; + json_buf1.append(json1); + butil::IOBufAsZeroCopyInputStream input_stream1(json_buf1); + ASSERT_FALSE(json2pb::ProtoJsonToProtoMessage(&input_stream1, &aims, options, &error)); + LOG(INFO) << "Fail to ProtoJsonToProtoMessage: " << error; + error.clear(); + + std::string json2 = R"({"addr":"baidu.com",)" + R"("numbers":{"tel":123456,"cell":654321}})"; + ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(json2, &aims, options, &error)) << error; + ASSERT_TRUE(aims.has_addr()); + ASSERT_EQ(aims.addr(), "baidu.com"); + ASSERT_EQ(aims.numbers_size(), 2); + ASSERT_EQ(aims.numbers().at("tel"), 123456); + ASSERT_EQ(aims.numbers().at("cell"), 654321); + + aims.Clear(); + butil::IOBuf json_buf2; + json_buf2.append(json2); + butil::IOBufAsZeroCopyInputStream input_stream2(json_buf2); + ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream2, &aims, options, &error)) << error; + ASSERT_TRUE(aims.has_addr()); + ASSERT_EQ(aims.addr(), "baidu.com"); + ASSERT_EQ(aims.numbers_size(), 2); + ASSERT_EQ(aims.numbers().at("tel"), 123456); + ASSERT_EQ(aims.numbers().at("cell"), 654321); + + std::string json3 = R"({"addr":"baidu.com",)" + R"("contacts":{"email":"fr...@apache.org","office":"Shanghai"}})"; + AddressStringMapStd asms; + ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(json3, &asms, options, &error)) << error; + ASSERT_TRUE(asms.has_addr()); + ASSERT_EQ(asms.addr(), "baidu.com"); + ASSERT_EQ(asms.contacts().size(), 2); + ASSERT_EQ(asms.contacts().at("email"), "fr...@apache.org"); + ASSERT_EQ(asms.contacts().at("office"), "Shanghai"); + + asms.Clear(); + butil::IOBuf json_buf3; + json_buf3.append(json3); + butil::IOBufAsZeroCopyInputStream input_stream3(json_buf3); + ASSERT_TRUE(json2pb::ProtoJsonToProtoMessage(&input_stream3, &asms, options, &error)) << error; + ASSERT_TRUE(asms.has_addr()); + ASSERT_EQ(asms.addr(), "baidu.com"); + ASSERT_EQ(asms.contacts().size(), 2); + ASSERT_EQ(asms.contacts().at("email"), "fr...@apache.org"); + ASSERT_EQ(asms.contacts().at("office"), "Shanghai"); +} + +TEST_F(ProtobufJsonTest, pb_to_proto_json) { + std::string error; + json2pb::Pb2ProtoJsonOptions options; + + AddressIntMapStd aims; + aims.set_addr("baidu.com"); + (*aims.mutable_numbers())["tel"] = 123456; + (*aims.mutable_numbers())["cell"] = 654321; + std::string json1; + ASSERT_TRUE(json2pb::ProtoMessageToJson(aims, &json1)) << error; + ASSERT_NE(json1.find(R"("addr":"baidu.com")"), std::string::npos); + ASSERT_NE(json1.find(R"("cell":654321)"), std::string::npos); + ASSERT_NE(json1.find(R"("tel":123456)"), std::string::npos); + + butil::IOBuf json_buf1; + json_buf1.append(json1); + butil::IOBufAsZeroCopyOutputStream output_stream1(&json_buf1); + ASSERT_TRUE(json2pb::ProtoMessageToJson(aims, &output_stream1, &error)) << error; + json1 = json_buf1.to_string(); + ASSERT_NE(json1.find(R"("addr":"baidu.com")"), std::string::npos); + ASSERT_NE(json1.find(R"("cell":654321)"), std::string::npos); + ASSERT_NE(json1.find(R"("tel":123456)"), std::string::npos); + + AddressStringMapStd asms; + asms.set_addr("baidu.com"); + (*asms.mutable_contacts())["email"] = "fr...@apache.org"; + (*asms.mutable_contacts())["office"] = "Shanghai"; + std::string json2; + ASSERT_TRUE(json2pb::ProtoMessageToJson(asms, &json2)) << error; + ASSERT_NE(json2.find(R"("addr":"baidu.com")"), std::string::npos); + ASSERT_NE(json2.find(R"("email":"fr...@apache.org")"), std::string::npos); + ASSERT_NE(json2.find(R"("office":"Shanghai")"), std::string::npos); + + butil::IOBuf json_buf2; + json_buf2.append(json2); + butil::IOBufAsZeroCopyOutputStream output_stream2(&json_buf2); + ASSERT_TRUE(json2pb::ProtoMessageToJson(asms, &output_stream2, &error)) << error; + json2 = json_buf2.to_string(); + ASSERT_NE(json2.find(R"("addr":"baidu.com")"), std::string::npos); + ASSERT_NE(json2.find(R"("email":"fr...@apache.org")"), std::string::npos); + ASSERT_NE(json2.find(R"("office":"Shanghai")"), std::string::npos); +} + } // namespace diff --git a/test/bthread_rwlock_unittest.cpp b/test/bthread_rwlock_unittest.cpp index 815494ca..2da226cb 100644 --- a/test/bthread_rwlock_unittest.cpp +++ b/test/bthread_rwlock_unittest.cpp @@ -26,9 +26,9 @@ int c = 0; void* rdlocker(void* arg) { auto rw = (bthread_rwlock_t*)arg; bthread_rwlock_rdlock(rw); - LOG(INFO) <<butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %" PRId64 "ms\n", - pthread_numeric_id(), ++c, - butil::cpuwide_time_ms() - start_time); + LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm rdlocker, %d, %" PRId64 "ms\n", + pthread_numeric_id(), ++c, + butil::cpuwide_time_ms() - start_time); bthread_usleep(10000); bthread_rwlock_unlock(rw); return NULL; @@ -38,8 +38,8 @@ void* wrlocker(void* arg) { auto rw = (bthread_rwlock_t*)arg; bthread_rwlock_wrlock(rw); LOG(INFO) << butil::string_printf("[%" PRIu64 "] I'm wrlocker, %d, %" PRId64 "ms\n", - pthread_numeric_id(), ++c, - butil::cpuwide_time_ms() - start_time); + pthread_numeric_id(), ++c, + butil::cpuwide_time_ms() - start_time); bthread_usleep(10000); bthread_rwlock_unlock(rw); return NULL; --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org