Repository: thrift Updated Branches: refs/heads/master b31f0900b -> 792db4e92
THRIFT-2423 Facebook's THeader protocol and transport for cpp Client: C++ Library, Compiler Patch: Dave Watson rebased by Nobuaki Sukegawa This closes #357 and closes #677 Project: http://git-wip-us.apache.org/repos/asf/thrift/repo Commit: http://git-wip-us.apache.org/repos/asf/thrift/commit/792db4e9 Tree: http://git-wip-us.apache.org/repos/asf/thrift/tree/792db4e9 Diff: http://git-wip-us.apache.org/repos/asf/thrift/diff/792db4e9 Branch: refs/heads/master Commit: 792db4e92607a38a45eeb57f9561210cd4d4ef73 Parents: b31f090 Author: Dave Watson <[email protected]> Authored: Fri Jan 16 11:22:01 2015 -0800 Committer: Nobuaki Sukegawa <[email protected]> Committed: Wed Nov 4 01:25:22 2015 +0900 ---------------------------------------------------------------------- doc/specs/HeaderFormat.md | 82 +++ lib/c_glib/test/CMakeLists.txt | 2 +- lib/cpp/CMakeLists.txt | 4 + lib/cpp/Makefile.am | 8 +- lib/cpp/src/thrift/protocol/TBinaryProtocol.h | 3 +- lib/cpp/src/thrift/protocol/THeaderProtocol.cpp | 251 ++++++++ lib/cpp/src/thrift/protocol/THeaderProtocol.h | 212 +++++++ lib/cpp/src/thrift/protocol/TProtocol.h | 5 + lib/cpp/src/thrift/protocol/TProtocolTypes.h | 34 ++ .../src/thrift/server/TNonblockingServer.cpp | 43 +- lib/cpp/src/thrift/server/TNonblockingServer.h | 5 + lib/cpp/src/thrift/server/TServerFramework.cpp | 9 +- .../src/thrift/transport/TBufferTransports.h | 10 + .../src/thrift/transport/THeaderTransport.cpp | 597 +++++++++++++++++++ lib/cpp/src/thrift/transport/THeaderTransport.h | 290 +++++++++ lib/cpp/test/CMakeLists.txt | 23 +- test/cpp/CMakeLists.txt | 9 +- test/cpp/Makefile.am | 4 +- test/cpp/src/TestClient.cpp | 7 +- test/cpp/src/TestServer.cpp | 14 +- test/known_failures_Linux.json | 2 + test/tests.json | 3 +- tutorial/cpp/CMakeLists.txt | 2 + 23 files changed, 1593 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/doc/specs/HeaderFormat.md ---------------------------------------------------------------------- diff --git a/doc/specs/HeaderFormat.md b/doc/specs/HeaderFormat.md new file mode 100644 index 0000000..42ec7ae --- /dev/null +++ b/doc/specs/HeaderFormat.md @@ -0,0 +1,82 @@ +<link href="http://kevinburke.bitbucket.org/markdowncss/markdown.css" rel="stylesheet"></link> + +Header format for the THeader.h +=============================== + + 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f + +----------------------------------------------------------------+ + | 0| LENGTH | + +----------------------------------------------------------------+ + | 0| HEADER MAGIC | FLAGS | + +----------------------------------------------------------------+ + | SEQUENCE NUMBER | + +----------------------------------------------------------------+ + | 0| Header Size(/32) | ... + +--------------------------------- + + Header is of variable size: + (and starts at offset 14) + + +----------------------------------------------------------------+ + | PROTOCOL ID (varint) | NUM TRANSFORMS (varint) | + +----------------------------------------------------------------+ + | TRANSFORM 0 ID (varint) | TRANSFORM 0 DATA ... + +----------------------------------------------------------------+ + | ... ... | + +----------------------------------------------------------------+ + | INFO 0 ID (varint) | INFO 0 DATA ... + +----------------------------------------------------------------+ + | ... ... | + +----------------------------------------------------------------+ + | | + | PAYLOAD | + | | + +----------------------------------------------------------------+ + +The `LENGTH` field is 32 bits, and counts the remaining bytes in the +packet, NOT including the length field. The header size field is 16 +bits, and defines the size of the header remaining NOT including the +`HEADER MAGIC`, `FLAGS`, `SEQUENCE NUMBER` and header size fields. The +Header size field is in bytes/4. + +The transform ID's are varints. The data for each transform is +defined by the transform ID in the code - no size is given in the +header. If a transform ID is specified from a client and the server +doesn't know about the transform ID, an error MUST be returned as we +don't know how to transform the data. + +Conversely, data in the info headers is ignorable. This should only +be things like timestamps, debuging tracing, etc. Using the header +size you should be able to skip this data and read the payload safely +if you don't know the info ID. + +Info's should be oldest supported to newest supported order, so that +if we read an info ID we don't support, none of the remaining info +ID's will be supported either, and we can safely skip to the payload. + +Info ID's and transform ID's should share the same ID space. + +### PADDING: + +Header will be padded out to next 4-byte boundary with `0x00`. + +Max frame size is `0x3FFFFFFF`, which is slightly less than `HTTP_MAGIC`. +This allows us to distingush between different (older) transports. + +### Transform IDs: + + ZLIB_TRANSFORM 0x01 - No data for this. Use zlib to (de)compress the + data. + + HMAC_TRANSFORM 0x02 - Variable amount of mac data. One byte to specify + size. Mac data is appended at the end of the packet. + SNAPPY_TRANSFORM 0x03 - No data for this. Use snappy to (de)compress the + data. + + +###Info IDs: + + INFO_KEYVALUE 0x01 - varint32 number of headers. + - key/value pairs of varstrings (varint16 length plus + no-trailing-null string). + http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/c_glib/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/c_glib/test/CMakeLists.txt b/lib/c_glib/test/CMakeLists.txt index 31e6c6b..1b32c46 100644 --- a/lib/c_glib/test/CMakeLists.txt +++ b/lib/c_glib/test/CMakeLists.txt @@ -116,7 +116,7 @@ if(BUILD_CPP) include_directories("${CMAKE_CURRENT_BINARY_DIR}/gen-cpp" "${CMAKE_CURRENT_BINARY_DIR}/gen-c_glib") add_executable(testthrifttestclient testthrifttestclient.cpp) - target_link_libraries(testthrifttestclient testgenc testgenc_cpp) + target_link_libraries(testthrifttestclient testgenc testgenc_cpp ${ZLIB_LIBRARIES}) add_test(NAME testthrifttestclient COMMAND testthrifttestclient) endif(BUILD_CPP) http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt index 4c7caeb..a0b9743 100755 --- a/lib/cpp/CMakeLists.txt +++ b/lib/cpp/CMakeLists.txt @@ -46,6 +46,7 @@ set( thriftcpp_SOURCES src/thrift/protocol/TJSONProtocol.cpp src/thrift/protocol/TMultiplexedProtocol.cpp src/thrift/protocol/TProtocol.cpp + src/thrift/protocol/THeaderProtocol.cpp src/thrift/transport/TTransportException.cpp src/thrift/transport/TFDTransport.cpp src/thrift/transport/TSimpleFileTransport.cpp @@ -57,6 +58,7 @@ set( thriftcpp_SOURCES src/thrift/transport/TServerSocket.cpp src/thrift/transport/TTransportUtils.cpp src/thrift/transport/TBufferTransports.cpp + src/thrift/transport/THeaderTransport.cpp src/thrift/server/TConnectedClient.cpp src/thrift/server/TServerFramework.cpp src/thrift/server/TSimpleServer.cpp @@ -146,6 +148,8 @@ set( thriftcppnb_SOURCES # Thrift zlib server set( thriftcppz_SOURCES src/thrift/transport/TZlibTransport.cpp + src/thrift/protocol/THeaderProtocol.cpp + src/thrift/transport/THeaderTransport.cpp ) # Thrift Qt4 server http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am index 4742ee0..80e8917 100755 --- a/lib/cpp/Makefile.am +++ b/lib/cpp/Makefile.am @@ -114,7 +114,10 @@ libthriftnb_la_SOURCES = src/thrift/server/TNonblockingServer.cpp \ src/thrift/async/TEvhttpServer.cpp \ src/thrift/async/TEvhttpClientChannel.cpp -libthriftz_la_SOURCES = src/thrift/transport/TZlibTransport.cpp +libthriftz_la_SOURCES = src/thrift/transport/TZlibTransport.cpp \ + src/thrift/transport/THeaderTransport.cpp \ + src/thrift/protocol/THeaderProtocol.cpp + libthriftqt_la_MOC = src/thrift/qt/moc_TQTcpServer.cpp nodist_libthriftqt_la_SOURCES = $(libthriftqt_la_MOC) @@ -183,11 +186,13 @@ include_protocol_HEADERS = \ src/thrift/protocol/TCompactProtocol.h \ src/thrift/protocol/TCompactProtocol.tcc \ src/thrift/protocol/TDebugProtocol.h \ + src/thrift/protocol/THeaderProtocol.h \ src/thrift/protocol/TBase64Utils.h \ src/thrift/protocol/TJSONProtocol.h \ src/thrift/protocol/TMultiplexedProtocol.h \ src/thrift/protocol/TProtocolDecorator.h \ src/thrift/protocol/TProtocolTap.h \ + src/thrift/protocol/TProtocolTypes.h \ src/thrift/protocol/TProtocolException.h \ src/thrift/protocol/TVirtualProtocol.h \ src/thrift/protocol/TProtocol.h @@ -197,6 +202,7 @@ include_transport_HEADERS = \ src/thrift/transport/PlatformSocket.h \ src/thrift/transport/TFDTransport.h \ src/thrift/transport/TFileTransport.h \ + src/thrift/transport/THeaderTransport.h \ src/thrift/transport/TSimpleFileTransport.h \ src/thrift/transport/TServerSocket.h \ src/thrift/transport/TSSLServerSocket.h \ http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/protocol/TBinaryProtocol.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/TBinaryProtocol.h b/lib/cpp/src/thrift/protocol/TBinaryProtocol.h index e0650cf..92491b0 100644 --- a/lib/cpp/src/thrift/protocol/TBinaryProtocol.h +++ b/lib/cpp/src/thrift/protocol/TBinaryProtocol.h @@ -36,12 +36,11 @@ namespace protocol { */ template <class Transport_, class ByteOrder_ = TNetworkBigEndian> class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > { -protected: +public: static const int32_t VERSION_MASK = ((int32_t)0xffff0000); static const int32_t VERSION_1 = ((int32_t)0x80010000); // VERSION_2 (0x80020000) was taken by TDenseProtocol (which has since been removed) -public: TBinaryProtocolT(boost::shared_ptr<Transport_> trans) : TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> >(trans), trans_(trans.get()), http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp b/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp new file mode 100644 index 0000000..76732b0 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_ +#define THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_ 1 + +#include <thrift/protocol/THeaderProtocol.h> +#include <thrift/protocol/TCompactProtocol.h> +#include <thrift/TApplicationException.h> + +#include <limits> +#include <boost/static_assert.hpp> + +namespace apache { +namespace thrift { +namespace protocol { + +void THeaderProtocol::resetProtocol() { + if (proto_ && protoId_ == trans_->getProtocolId()) { + return; + } + + protoId_ = trans_->getProtocolId(); + + switch (protoId_) { + case T_BINARY_PROTOCOL: + proto_ = boost::make_shared<TBinaryProtocolT<THeaderTransport> >(trans_); + break; + + case T_COMPACT_PROTOCOL: + proto_ = boost::make_shared<TCompactProtocolT<THeaderTransport> >(trans_); + break; + + default: + throw TApplicationException(TApplicationException::INVALID_PROTOCOL, + "Unknown protocol requested"); + } +} + +uint32_t THeaderProtocol::writeMessageBegin(const std::string& name, + const TMessageType messageType, + const int32_t seqId) { + resetProtocol(); // Reset in case we changed protocols + trans_->setSequenceNumber(seqId); + return proto_->writeMessageBegin(name, messageType, seqId); +} + +uint32_t THeaderProtocol::writeMessageEnd() { + return proto_->writeMessageEnd(); +} + +uint32_t THeaderProtocol::writeStructBegin(const char* name) { + return proto_->writeStructBegin(name); +} + +uint32_t THeaderProtocol::writeStructEnd() { + return proto_->writeStructEnd(); +} + +uint32_t THeaderProtocol::writeFieldBegin(const char* name, + const TType fieldType, + const int16_t fieldId) { + return proto_->writeFieldBegin(name, fieldType, fieldId); +} + +uint32_t THeaderProtocol::writeFieldEnd() { + return proto_->writeFieldEnd(); +} + +uint32_t THeaderProtocol::writeFieldStop() { + return proto_->writeFieldStop(); +} + +uint32_t THeaderProtocol::writeMapBegin(const TType keyType, + const TType valType, + const uint32_t size) { + return proto_->writeMapBegin(keyType, valType, size); +} + +uint32_t THeaderProtocol::writeMapEnd() { + return proto_->writeMapEnd(); +} + +uint32_t THeaderProtocol::writeListBegin(const TType elemType, const uint32_t size) { + return proto_->writeListBegin(elemType, size); +} + +uint32_t THeaderProtocol::writeListEnd() { + return proto_->writeListEnd(); +} + +uint32_t THeaderProtocol::writeSetBegin(const TType elemType, const uint32_t size) { + return proto_->writeSetBegin(elemType, size); +} + +uint32_t THeaderProtocol::writeSetEnd() { + return proto_->writeSetEnd(); +} + +uint32_t THeaderProtocol::writeBool(const bool value) { + return proto_->writeBool(value); +} + +uint32_t THeaderProtocol::writeByte(const int8_t byte) { + return proto_->writeByte(byte); +} + +uint32_t THeaderProtocol::writeI16(const int16_t i16) { + return proto_->writeI16(i16); +} + +uint32_t THeaderProtocol::writeI32(const int32_t i32) { + return proto_->writeI32(i32); +} + +uint32_t THeaderProtocol::writeI64(const int64_t i64) { + return proto_->writeI64(i64); +} + +uint32_t THeaderProtocol::writeDouble(const double dub) { + return proto_->writeDouble(dub); +} + +uint32_t THeaderProtocol::writeString(const std::string& str) { + return proto_->writeString(str); +} + +uint32_t THeaderProtocol::writeBinary(const std::string& str) { + return proto_->writeBinary(str); +} + +/** + * Reading functions + */ + +uint32_t THeaderProtocol::readMessageBegin(std::string& name, + TMessageType& messageType, + int32_t& seqId) { + // Read the next frame, and change protocols if needed + try { + trans_->resetProtocol(); + resetProtocol(); + } catch (const TApplicationException& ex) { + writeMessageBegin("", T_EXCEPTION, 0); + ex.write((TProtocol*)this); + writeMessageEnd(); + trans_->flush(); + + // The framing is still good, but we don't know about this protocol. + // In the future, this could be made a client-side only error if + // connection pooling is used. + throw ex; + } + return proto_->readMessageBegin(name, messageType, seqId); +} + +uint32_t THeaderProtocol::readMessageEnd() { + return proto_->readMessageEnd(); +} + +uint32_t THeaderProtocol::readStructBegin(std::string& name) { + return proto_->readStructBegin(name); +} + +uint32_t THeaderProtocol::readStructEnd() { + return proto_->readStructEnd(); +} + +uint32_t THeaderProtocol::readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) { + return proto_->readFieldBegin(name, fieldType, fieldId); +} + +uint32_t THeaderProtocol::readFieldEnd() { + return proto_->readFieldEnd(); +} + +uint32_t THeaderProtocol::readMapBegin(TType& keyType, TType& valType, uint32_t& size) { + return proto_->readMapBegin(keyType, valType, size); +} + +uint32_t THeaderProtocol::readMapEnd() { + return proto_->readMapEnd(); +} + +uint32_t THeaderProtocol::readListBegin(TType& elemType, uint32_t& size) { + return proto_->readListBegin(elemType, size); +} + +uint32_t THeaderProtocol::readListEnd() { + return proto_->readListEnd(); +} + +uint32_t THeaderProtocol::readSetBegin(TType& elemType, uint32_t& size) { + return proto_->readSetBegin(elemType, size); +} + +uint32_t THeaderProtocol::readSetEnd() { + return proto_->readSetEnd(); +} + +uint32_t THeaderProtocol::readBool(bool& value) { + return proto_->readBool(value); +} + +uint32_t THeaderProtocol::readByte(int8_t& byte) { + return proto_->readByte(byte); +} + +uint32_t THeaderProtocol::readI16(int16_t& i16) { + return proto_->readI16(i16); +} + +uint32_t THeaderProtocol::readI32(int32_t& i32) { + return proto_->readI32(i32); +} + +uint32_t THeaderProtocol::readI64(int64_t& i64) { + return proto_->readI64(i64); +} + +uint32_t THeaderProtocol::readDouble(double& dub) { + return proto_->readDouble(dub); +} + +uint32_t THeaderProtocol::readString(std::string& str) { + return proto_->readString(str); +} + +uint32_t THeaderProtocol::readBinary(std::string& binary) { + return proto_->readBinary(binary); +} +} +} +} // apache::thrift::protocol + +#endif // #ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_ http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/protocol/THeaderProtocol.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/THeaderProtocol.h b/lib/cpp/src/thrift/protocol/THeaderProtocol.h new file mode 100644 index 0000000..3998f48 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/THeaderProtocol.h @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_H_ +#define THRIFT_PROTOCOL_THEADERPROTOCOL_H_ 1 + +#include <thrift/protocol/TProtocol.h> +#include <thrift/protocol/TProtocolTypes.h> +#include <thrift/protocol/TVirtualProtocol.h> +#include <thrift/transport/THeaderTransport.h> + +#include <bitset> + +#include <boost/shared_ptr.hpp> +#include <boost/make_shared.hpp> +using apache::thrift::transport::THeaderTransport; + +namespace apache { +namespace thrift { +namespace protocol { + +/** + * The header protocol for thrift. Reads unframed, framed, header format, + * and http + * + */ +class THeaderProtocol : public TVirtualProtocol<THeaderProtocol> { +protected: +public: + void resetProtocol(); + + explicit THeaderProtocol(const boost::shared_ptr<TTransport>& trans, + uint16_t protoId = T_COMPACT_PROTOCOL) + : TVirtualProtocol<THeaderProtocol>(boost::shared_ptr<TTransport>(new THeaderTransport(trans))), + trans_(boost::dynamic_pointer_cast<THeaderTransport>(this->getTransport())), + protoId_(protoId) { + trans_->setProtocolId(protoId); + resetProtocol(); + } + + THeaderProtocol(const boost::shared_ptr<TTransport>& inTrans, + const boost::shared_ptr<TTransport>& outTrans, + uint16_t protoId = T_COMPACT_PROTOCOL) + : TVirtualProtocol<THeaderProtocol>( + boost::shared_ptr<TTransport>(new THeaderTransport(inTrans, outTrans))), + trans_(boost::dynamic_pointer_cast<THeaderTransport>(this->getTransport())), + protoId_(protoId) { + trans_->setProtocolId(protoId); + resetProtocol(); + } + + ~THeaderProtocol() {} + + /** + * Functions to work with headers by calling into THeaderTransport + */ + void setProtocolId(uint16_t protoId) { + trans_->setProtocolId(protoId); + resetProtocol(); + } + + typedef THeaderTransport::StringToStringMap StringToStringMap; + + // these work with write headers + void setHeader(const std::string& key, const std::string& value) { + trans_->setHeader(key, value); + } + + void clearHeaders() { trans_->clearHeaders(); } + + StringToStringMap& getWriteHeaders() { return trans_->getWriteHeaders(); } + + // these work with read headers + const StringToStringMap& getHeaders() const { return trans_->getHeaders(); } + + /** + * Writing functions. + */ + + /*ol*/ uint32_t writeMessageBegin(const std::string& name, + const TMessageType messageType, + const int32_t seqId); + + /*ol*/ uint32_t writeMessageEnd(); + + uint32_t writeStructBegin(const char* name); + + uint32_t writeStructEnd(); + + uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId); + + uint32_t writeFieldEnd(); + + uint32_t writeFieldStop(); + + uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size); + + uint32_t writeMapEnd(); + + uint32_t writeListBegin(const TType elemType, const uint32_t size); + + uint32_t writeListEnd(); + + uint32_t writeSetBegin(const TType elemType, const uint32_t size); + + uint32_t writeSetEnd(); + + uint32_t writeBool(const bool value); + + uint32_t writeByte(const int8_t byte); + + uint32_t writeI16(const int16_t i16); + + uint32_t writeI32(const int32_t i32); + + uint32_t writeI64(const int64_t i64); + + uint32_t writeDouble(const double dub); + + uint32_t writeString(const std::string& str); + + uint32_t writeBinary(const std::string& str); + + /** + * Reading functions + */ + + /*ol*/ uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqId); + + /*ol*/ uint32_t readMessageEnd(); + + uint32_t readStructBegin(std::string& name); + + uint32_t readStructEnd(); + + uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId); + + uint32_t readFieldEnd(); + + uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size); + + uint32_t readMapEnd(); + + uint32_t readListBegin(TType& elemType, uint32_t& size); + + uint32_t readListEnd(); + + uint32_t readSetBegin(TType& elemType, uint32_t& size); + + uint32_t readSetEnd(); + + uint32_t readBool(bool& value); + // Provide the default readBool() implementation for std::vector<bool> + using TVirtualProtocol<THeaderProtocol>::readBool; + + uint32_t readByte(int8_t& byte); + + uint32_t readI16(int16_t& i16); + + uint32_t readI32(int32_t& i32); + + uint32_t readI64(int64_t& i64); + + uint32_t readDouble(double& dub); + + uint32_t readString(std::string& str); + + uint32_t readBinary(std::string& binary); + +protected: + boost::shared_ptr<THeaderTransport> trans_; + + boost::shared_ptr<TProtocol> proto_; + uint32_t protoId_; +}; + +class THeaderProtocolFactory : public TProtocolFactory { +public: + virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<transport::TTransport> trans) { + THeaderProtocol* headerProtocol + = new THeaderProtocol(trans, boost::shared_ptr<transport::TTransport>(), T_BINARY_PROTOCOL); + return boost::shared_ptr<TProtocol>(headerProtocol); + } + + virtual boost::shared_ptr<TProtocol> getProtocol( + boost::shared_ptr<transport::TTransport> inTrans, + boost::shared_ptr<transport::TTransport> outTrans) { + THeaderProtocol* headerProtocol = new THeaderProtocol(inTrans, outTrans, T_BINARY_PROTOCOL); + return boost::shared_ptr<TProtocol>(headerProtocol); + } +}; +} +} +} // apache::thrift::protocol + +#endif // #ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_H_ http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/protocol/TProtocol.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/TProtocol.h b/lib/cpp/src/thrift/protocol/TProtocol.h index 0db2216..1b46faf 100644 --- a/lib/cpp/src/thrift/protocol/TProtocol.h +++ b/lib/cpp/src/thrift/protocol/TProtocol.h @@ -595,6 +595,11 @@ public: virtual ~TProtocolFactory(); virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0; + virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> inTrans, + boost::shared_ptr<TTransport> outTrans) { + (void)outTrans; + return getProtocol(inTrans); + } }; /** http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/protocol/TProtocolTypes.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/protocol/TProtocolTypes.h b/lib/cpp/src/thrift/protocol/TProtocolTypes.h new file mode 100644 index 0000000..ca22b54 --- /dev/null +++ b/lib/cpp/src/thrift/protocol/TProtocolTypes.h @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_PROTOCOL_TPROTOCOLTYPES_H_ +#define THRIFT_PROTOCOL_TPROTOCOLTYPES_H_ 1 + +namespace apache { namespace thrift { namespace protocol { + +enum PROTOCOL_TYPES { + T_BINARY_PROTOCOL = 0, + T_JSON_PROTOCOL = 1, + T_COMPACT_PROTOCOL = 2, +}; + +}}} // apache::thrift::protocol + +#endif // #define _THRIFT_PROTOCOL_TPROTOCOLTYPES_H_ 1 + http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/server/TNonblockingServer.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index d1e9ede..ede34c4 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -392,8 +392,14 @@ void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket, factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_); // Create protocol - inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); - outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); + if (server_->getHeaderTransport()) { + inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_, + factoryOutputTransport_); + outputProtocol_ = inputProtocol_; + } else { + inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); + outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); + } // Set up for any server event handler serverEventHandler_ = server_->getEventHandler(); @@ -535,6 +541,14 @@ void TNonblockingServer::TConnection::workSocket() { } } +bool TNonblockingServer::getHeaderTransport() { + // Currently if there is no output protocol factory, + // we assume header transport (without having to create + // a new transport and check) + return getOutputProtocolFactory() == NULL; +} + + /** * This is called when the application transitions from one state into * another. This means that it has finished writing the data that it needed @@ -551,12 +565,20 @@ void TNonblockingServer::TConnection::transition() { case APP_READ_REQUEST: // We are done reading the request, package the read buffer into transport // and get back some data from the dispatch function - inputTransport_->resetBuffer(readBuffer_, readBufferPos_); - outputTransport_->resetBuffer(); - // Prepend four bytes of blank space to the buffer so we can - // write the frame size there later. - outputTransport_->getWritePtr(4); - outputTransport_->wroteBytes(4); + if (server_->getHeaderTransport()) { + inputTransport_->resetBuffer(readBuffer_, readBufferPos_); + outputTransport_->resetBuffer(); + } else { + // We saved room for the framing size in case header transport needed it, + // but just skip it for the non-header case + inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4); + outputTransport_->resetBuffer(); + + // Prepend four bytes of blank space to the buffer so we can + // write the frame size there later. + outputTransport_->getWritePtr(4); + outputTransport_->wroteBytes(4); + } server_->incrementActiveProcessors(); @@ -691,6 +713,8 @@ void TNonblockingServer::TConnection::transition() { return; case APP_READ_FRAME_SIZE: + readWant_ += 4; + // We just read the request length // Double the buffer size until it is big enough if (readWant_ > readBufferSize_) { @@ -711,7 +735,8 @@ void TNonblockingServer::TConnection::transition() { readBufferSize_ = newSize; } - readBufferPos_ = 0; + readBufferPos_ = 4; + *((uint32_t*)readBuffer_) = htonl(readWant_ - 4); // Move into read request state socketState_ = SOCKET_RECV; http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/server/TNonblockingServer.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h index 4fb83f1..82d40e9 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.h +++ b/lib/cpp/src/thrift/server/TNonblockingServer.h @@ -712,6 +712,11 @@ public: */ event_base* getUserEventBase() const { return userEventBase_; } + /** Some transports, like THeaderTransport, require passing through + * the framing size instead of stripping it. + */ + bool getHeaderTransport(); + private: /** * Callback function that the threadmanager calls when a task reaches http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/server/TServerFramework.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp index e843921..43d11c5 100644 --- a/lib/cpp/src/thrift/server/TServerFramework.cpp +++ b/lib/cpp/src/thrift/server/TServerFramework.cpp @@ -147,8 +147,13 @@ void TServerFramework::serve() { inputTransport = inputTransportFactory_->getTransport(client); outputTransport = outputTransportFactory_->getTransport(client); - inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); - outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + if (!outputProtocolFactory_) { + inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport); + outputProtocol = inputProtocol; + } else { + inputProtocol = inputProtocolFactory_->getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_->getProtocol(outputTransport); + } newlyConnectedClient(shared_ptr<TConnectedClient>( new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client), http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/transport/TBufferTransports.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/TBufferTransports.h b/lib/cpp/src/thrift/transport/TBufferTransports.h index 98e8a9e..013c6e0 100644 --- a/lib/cpp/src/thrift/transport/TBufferTransports.h +++ b/lib/cpp/src/thrift/transport/TBufferTransports.h @@ -309,6 +309,16 @@ public: static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; /// Use default buffer sizes. + TFramedTransport() + : transport_(), + rBufSize_(0), + wBufSize_(DEFAULT_BUFFER_SIZE), + rBuf_(), + wBuf_(new uint8_t[wBufSize_]), + bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { + initPointers(); + } + TFramedTransport(boost::shared_ptr<TTransport> transport) : transport_(transport), rBufSize_(0), http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/transport/THeaderTransport.cpp ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/THeaderTransport.cpp b/lib/cpp/src/thrift/transport/THeaderTransport.cpp new file mode 100644 index 0000000..26841b9 --- /dev/null +++ b/lib/cpp/src/thrift/transport/THeaderTransport.cpp @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/transport/THeaderTransport.h> +#include <thrift/TApplicationException.h> +#include <thrift/protocol/TProtocolTypes.h> + +#include <algorithm> +#include <bitset> +#include <cassert> +#include <string> +#include <zlib.h> + +using std::map; +using boost::shared_ptr; +using std::string; +using std::vector; + +namespace apache { +namespace thrift { +namespace transport { + +using namespace apache::thrift::protocol; +using apache::thrift::protocol::TBinaryProtocol; + +uint32_t THeaderTransport::readAll(uint8_t* buf, uint32_t len) { + // We want to call TBufferBase's version here, because + // TFramedTransport would try and call its own readFrame function + return TBufferBase::readAll(buf, len); +} + +uint32_t THeaderTransport::readSlow(uint8_t* buf, uint32_t len) { + + if (clientType == THRIFT_UNFRAMED_DEPRECATED) { + return transport_->read(buf, len); + } + + return TFramedTransport::readSlow(buf, len); +} + +uint16_t THeaderTransport::getProtocolId() const { + if (clientType == THRIFT_HEADER_CLIENT_TYPE) { + return protoId; + } else { + return T_BINARY_PROTOCOL; // Assume other transports use TBinary + } +} + +void THeaderTransport::ensureReadBuffer(uint32_t sz) { + if (sz > rBufSize_) { + rBuf_.reset(new uint8_t[sz]); + rBufSize_ = sz; + } +} + +bool THeaderTransport::readFrame(uint32_t minFrameSize) { + // szN is network byte order of sz + uint32_t szN; + uint32_t sz; + + // Read the size of the next frame. + // We can't use readAll(&sz, sizeof(sz)), since that always throws an + // exception on EOF. We want to throw an exception only if EOF occurs after + // partial size data. + uint32_t sizeBytesRead = 0; + while (sizeBytesRead < sizeof(szN)) { + uint8_t* szp = reinterpret_cast<uint8_t*>(&szN) + sizeBytesRead; + uint32_t bytesRead = transport_->read(szp, sizeof(szN) - sizeBytesRead); + if (bytesRead == 0) { + if (sizeBytesRead == 0) { + // EOF before any data was read. + return false; + } else { + // EOF after a partial frame header. Raise an exception. + throw TTransportException(TTransportException::END_OF_FILE, + "No more data to read after " + "partial frame header."); + } + } + sizeBytesRead += bytesRead; + } + + sz = ntohl(szN); + + ensureReadBuffer(minFrameSize + 4); + + if ((sz & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) { + // unframed + clientType = THRIFT_UNFRAMED_DEPRECATED; + memcpy(rBuf_.get(), &szN, sizeof(szN)); + if (minFrameSize > 4) { + transport_->readAll(rBuf_.get() + 4, minFrameSize - 4); + setReadBuffer(rBuf_.get(), minFrameSize); + } else { + setReadBuffer(rBuf_.get(), 4); + } + } else { + // Could be header format or framed. Check next uint32 + uint32_t magic_n; + uint32_t magic; + + if (sz > MAX_FRAME_SIZE) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header transport frame is too large"); + } + + ensureReadBuffer(sz); + + // We can use readAll here, because it would be an invalid frame otherwise + transport_->readAll(reinterpret_cast<uint8_t*>(&magic_n), sizeof(magic_n)); + memcpy(rBuf_.get(), &magic_n, sizeof(magic_n)); + magic = ntohl(magic_n); + + if ((magic & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) { + // framed + clientType = THRIFT_FRAMED_DEPRECATED; + transport_->readAll(rBuf_.get() + 4, sz - 4); + setReadBuffer(rBuf_.get(), sz); + } else if (HEADER_MAGIC == (magic & HEADER_MASK)) { + if (sz < 10) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header transport frame is too small"); + } + + transport_->readAll(rBuf_.get() + 4, sz - 4); + + // header format + clientType = THRIFT_HEADER_CLIENT_TYPE; + // flags + flags = magic & FLAGS_MASK; + // seqId + uint32_t seqId_n; + memcpy(&seqId_n, rBuf_.get() + 4, sizeof(seqId_n)); + seqId = ntohl(seqId_n); + // header size + uint16_t headerSize_n; + memcpy(&headerSize_n, rBuf_.get() + 8, sizeof(headerSize_n)); + uint16_t headerSize = ntohs(headerSize_n); + setReadBuffer(rBuf_.get(), sz); + readHeaderFormat(headerSize, sz); + } else { + clientType = THRIFT_UNKNOWN_CLIENT_TYPE; + throw TTransportException(TTransportException::BAD_ARGS, + "Could not detect client transport type"); + } + } + + return true; +} + +/** + * Reads a string from ptr, taking care not to reach headerBoundary + * Advances ptr on success + * + * @param str output string + * @throws CORRUPTED_DATA if size of string exceeds boundary + */ +void THeaderTransport::readString(uint8_t*& ptr, + /* out */ string& str, + uint8_t const* headerBoundary) { + int32_t strLen; + + uint32_t bytes = readVarint32(ptr, &strLen, headerBoundary); + if (strLen > headerBoundary - ptr) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Info header length exceeds header size"); + } + ptr += bytes; + str.assign(reinterpret_cast<const char*>(ptr), strLen); + ptr += strLen; +} + +void THeaderTransport::readHeaderFormat(uint16_t headerSize, uint32_t sz) { + readTrans_.clear(); // Clear out any previous transforms. + readHeaders_.clear(); // Clear out any previous headers. + + // skip over already processed magic(4), seqId(4), headerSize(2) + uint8_t* ptr = reinterpret_cast<uint8_t*>(rBuf_.get() + 10); + + // Catch integer overflow, check for reasonable header size + assert(headerSize < 16384); + headerSize *= 4; + const uint8_t* const headerBoundary = ptr + headerSize; + if (headerSize > sz) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header size is larger than frame"); + } + uint8_t* data = ptr + headerSize; + ptr += readVarint16(ptr, &protoId, headerBoundary); + int16_t numTransforms; + ptr += readVarint16(ptr, &numTransforms, headerBoundary); + + // For now all transforms consist of only the ID, not data. + for (int i = 0; i < numTransforms; i++) { + int32_t transId; + ptr += readVarint32(ptr, &transId, headerBoundary); + + readTrans_.push_back(transId); + } + + // Info headers + while (ptr < headerBoundary) { + int32_t infoId; + ptr += readVarint32(ptr, &infoId, headerBoundary); + + if (infoId == 0) { + // header padding + break; + } + if (infoId >= infoIdType::END) { + // cannot handle infoId + break; + } + switch (infoId) { + case infoIdType::KEYVALUE: + // Process key-value headers + uint32_t numKVHeaders; + ptr += readVarint32(ptr, (int32_t*)&numKVHeaders, headerBoundary); + // continue until we reach (padded) end of packet + while (numKVHeaders-- && ptr < headerBoundary) { + // format: key; value + // both: length (varint32); value (string) + string key, value; + readString(ptr, key, headerBoundary); + // value + readString(ptr, value, headerBoundary); + // save to headers + readHeaders_[key] = value; + } + break; + } + } + + // Untransform the data section. rBuf will contain result. + untransform(data, sz - (data - rBuf_.get())); // ignore header in size calc +} + +void THeaderTransport::untransform(uint8_t* ptr, uint32_t sz) { + // Update the transform buffer size if needed + resizeTransformBuffer(); + + for (vector<uint16_t>::const_iterator it = readTrans_.begin(); it != readTrans_.end(); ++it) { + const uint16_t transId = *it; + + if (transId == ZLIB_TRANSFORM) { + z_stream stream; + int err; + + stream.next_in = ptr; + stream.avail_in = sz; + + // Setting these to 0 means use the default free/alloc functions + stream.zalloc = (alloc_func)0; + stream.zfree = (free_func)0; + stream.opaque = (voidpf)0; + err = inflateInit(&stream); + if (err != Z_OK) { + throw TApplicationException(TApplicationException::MISSING_RESULT, + "Error while zlib deflateInit"); + } + stream.next_out = tBuf_.get(); + stream.avail_out = tBufSize_; + err = inflate(&stream, Z_FINISH); + if (err != Z_STREAM_END || stream.avail_out == 0) { + throw TApplicationException(TApplicationException::MISSING_RESULT, + "Error while zlib deflate"); + } + sz = stream.total_out; + + err = inflateEnd(&stream); + if (err != Z_OK) { + throw TApplicationException(TApplicationException::MISSING_RESULT, + "Error while zlib deflateEnd"); + } + + memcpy(ptr, tBuf_.get(), sz); + } else { + throw TApplicationException(TApplicationException::MISSING_RESULT, "Unknown transform"); + } + } + + setReadBuffer(ptr, sz); +} + +/** + * We may have updated the wBuf size, update the tBuf size to match. + * Should be called in transform. + * + * The buffer should be slightly larger than write buffer size due to + * compression transforms (that may slightly grow on small frame sizes) + */ +void THeaderTransport::resizeTransformBuffer(uint32_t additionalSize) { + if (tBufSize_ < wBufSize_ + DEFAULT_BUFFER_SIZE) { + uint32_t new_size = wBufSize_ + DEFAULT_BUFFER_SIZE + additionalSize; + uint8_t* new_buf = new uint8_t[new_size]; + tBuf_.reset(new_buf); + tBufSize_ = new_size; + } +} + +void THeaderTransport::transform(uint8_t* ptr, uint32_t sz) { + // Update the transform buffer size if needed + resizeTransformBuffer(); + + for (vector<uint16_t>::const_iterator it = writeTrans_.begin(); it != writeTrans_.end(); ++it) { + const uint16_t transId = *it; + + if (transId == ZLIB_TRANSFORM) { + z_stream stream; + int err; + + stream.next_in = ptr; + stream.avail_in = sz; + + stream.zalloc = (alloc_func)0; + stream.zfree = (free_func)0; + stream.opaque = (voidpf)0; + err = deflateInit(&stream, Z_DEFAULT_COMPRESSION); + if (err != Z_OK) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Error while zlib deflateInit"); + } + uint32_t tbuf_size = 0; + while (err == Z_OK) { + resizeTransformBuffer(tbuf_size); + + stream.next_out = tBuf_.get(); + stream.avail_out = tBufSize_; + err = deflate(&stream, Z_FINISH); + tbuf_size += DEFAULT_BUFFER_SIZE; + } + sz = stream.total_out; + + err = deflateEnd(&stream); + if (err != Z_OK) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Error while zlib deflateEnd"); + } + + memcpy(ptr, tBuf_.get(), sz); + } else { + throw TTransportException(TTransportException::CORRUPTED_DATA, "Unknown transform"); + } + } + + wBase_ = wBuf_.get() + sz; +} + +void THeaderTransport::resetProtocol() { + // Set to anything except HTTP type so we don't flush again + clientType = THRIFT_HEADER_CLIENT_TYPE; + + // Read the header and decide which protocol to go with + readFrame(0); +} + +uint32_t THeaderTransport::getWriteBytes() { + return wBase_ - wBuf_.get(); +} + +/** + * Writes a string to a byte buffer, as size (varint32) + string (non-null + * terminated) + * Automatically advances ptr to after the written portion + */ +void THeaderTransport::writeString(uint8_t*& ptr, const string& str) { + uint32_t strLen = str.length(); + ptr += writeVarint32(strLen, ptr); + memcpy(ptr, str.c_str(), strLen); // no need to write \0 + ptr += strLen; +} + +void THeaderTransport::setHeader(const string& key, const string& value) { + writeHeaders_[key] = value; +} + +size_t THeaderTransport::getMaxWriteHeadersSize() const { + size_t maxWriteHeadersSize = 0; + THeaderTransport::StringToStringMap::const_iterator it; + for (it = writeHeaders_.begin(); it != writeHeaders_.end(); ++it) { + // add sizes of key and value to maxWriteHeadersSize + // 2 varints32 + the strings themselves + maxWriteHeadersSize += 5 + 5 + (it->first).length() + (it->second).length(); + } + return maxWriteHeadersSize; +} + +void THeaderTransport::clearHeaders() { + writeHeaders_.clear(); +} + +void THeaderTransport::flush() { + // Write out any data waiting in the write buffer. + uint32_t haveBytes = getWriteBytes(); + + if (clientType == THRIFT_HEADER_CLIENT_TYPE) { + transform(wBuf_.get(), haveBytes); + haveBytes = getWriteBytes(); // transform may have changed the size + } + + // Note that we reset wBase_ prior to the underlying write + // to ensure we're in a sane state (i.e. internal buffer cleaned) + // if the underlying write throws up an exception + wBase_ = wBuf_.get(); + + if (haveBytes > MAX_FRAME_SIZE) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Attempting to send frame that is too large"); + } + + if (clientType == THRIFT_HEADER_CLIENT_TYPE) { + // header size will need to be updated at the end because of varints. + // Make it big enough here for max varint size, plus 4 for padding. + int headerSize = (2 + getNumTransforms()) * THRIFT_MAX_VARINT32_BYTES + 4; + // add approximate size of info headers + headerSize += getMaxWriteHeadersSize(); + + // Pkt size + uint32_t maxSzHbo = headerSize + haveBytes // thrift header + payload + + 10; // common header section + uint8_t* pkt = tBuf_.get(); + uint8_t* headerStart; + uint8_t* headerSizePtr; + uint8_t* pktStart = pkt; + + if (maxSzHbo > tBufSize_) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Attempting to header frame that is too large"); + } + + uint32_t szHbo; + uint32_t szNbo; + uint16_t headerSizeN; + + // Fixup szHbo later + pkt += sizeof(szNbo); + uint16_t headerN = htons(HEADER_MAGIC >> 16); + memcpy(pkt, &headerN, sizeof(headerN)); + pkt += sizeof(headerN); + uint16_t flagsN = htons(flags); + memcpy(pkt, &flagsN, sizeof(flagsN)); + pkt += sizeof(flagsN); + uint32_t seqIdN = htonl(seqId); + memcpy(pkt, &seqIdN, sizeof(seqIdN)); + pkt += sizeof(seqIdN); + headerSizePtr = pkt; + // Fixup headerSizeN later + pkt += sizeof(headerSizeN); + headerStart = pkt; + + pkt += writeVarint32(protoId, pkt); + pkt += writeVarint32(getNumTransforms(), pkt); + + // For now, each transform is only the ID, no following data. + for (vector<uint16_t>::const_iterator it = writeTrans_.begin(); it != writeTrans_.end(); ++it) { + pkt += writeVarint32(*it, pkt); + } + + // write info headers + + // for now only write kv-headers + uint16_t headerCount = writeHeaders_.size(); + if (headerCount > 0) { + pkt += writeVarint32(infoIdType::KEYVALUE, pkt); + // Write key-value headers count + pkt += writeVarint32(headerCount, pkt); + // Write info headers + map<string, string>::const_iterator it; + for (it = writeHeaders_.begin(); it != writeHeaders_.end(); ++it) { + writeString(pkt, it->first); // key + writeString(pkt, it->second); // value + } + writeHeaders_.clear(); + } + + // Fixups after varint size calculations + headerSize = (pkt - headerStart); + uint8_t padding = 4 - (headerSize % 4); + headerSize += padding; + + // Pad out pkt with 0x00 + for (int i = 0; i < padding; i++) { + *(pkt++) = 0x00; + } + + // Pkt size + szHbo = headerSize + haveBytes // thrift header + payload + + (headerStart - pktStart - 4); // common header section + headerSizeN = htons(headerSize / 4); + memcpy(headerSizePtr, &headerSizeN, sizeof(headerSizeN)); + + // Set framing size. + szNbo = htonl(szHbo); + memcpy(pktStart, &szNbo, sizeof(szNbo)); + + outTransport_->write(pktStart, szHbo - haveBytes + 4); + outTransport_->write(wBuf_.get(), haveBytes); + } else if (clientType == THRIFT_FRAMED_DEPRECATED) { + uint32_t szHbo = (uint32_t)haveBytes; + uint32_t szNbo = htonl(szHbo); + + outTransport_->write(reinterpret_cast<uint8_t*>(&szNbo), 4); + outTransport_->write(wBuf_.get(), haveBytes); + } else if (clientType == THRIFT_UNFRAMED_DEPRECATED) { + outTransport_->write(wBuf_.get(), haveBytes); + } else { + throw TTransportException(TTransportException::BAD_ARGS, "Unknown client type"); + } + + // Flush the underlying transport. + outTransport_->flush(); +} + +/** + * Read an i16 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 3 bytes. + */ +uint32_t THeaderTransport::readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary) { + int32_t val; + uint32_t rsize = readVarint32(ptr, &val, boundary); + *i16 = (int16_t)val; + return rsize; +} + +/** + * Read an i32 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 5 bytes. + */ +uint32_t THeaderTransport::readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary) { + + uint32_t rsize = 0; + uint32_t val = 0; + int shift = 0; + + while (true) { + if (ptr == boundary) { + throw TApplicationException(TApplicationException::INVALID_MESSAGE_TYPE, + "Trying to read past header boundary"); + } + uint8_t byte = *(ptr++); + rsize++; + val |= (uint64_t)(byte & 0x7f) << shift; + shift += 7; + if (!(byte & 0x80)) { + *i32 = val; + return rsize; + } + } +} + +/** + * Write an i32 as a varint. Results in 1-5 bytes on the wire. + */ +uint32_t THeaderTransport::writeVarint32(int32_t n, uint8_t* pkt) { + uint8_t buf[5]; + uint32_t wsize = 0; + + while (true) { + if ((n & ~0x7F) == 0) { + buf[wsize++] = (int8_t)n; + break; + } else { + buf[wsize++] = (int8_t)((n & 0x7F) | 0x80); + n >>= 7; + } + } + + // Caller will advance pkt. + for (uint32_t i = 0; i < wsize; i++) { + pkt[i] = buf[i]; + } + + return wsize; +} + +uint32_t THeaderTransport::writeVarint16(int16_t n, uint8_t* pkt) { + return writeVarint32(n, pkt); +} +} +} +} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/src/thrift/transport/THeaderTransport.h ---------------------------------------------------------------------- diff --git a/lib/cpp/src/thrift/transport/THeaderTransport.h b/lib/cpp/src/thrift/transport/THeaderTransport.h new file mode 100644 index 0000000..0cef56d --- /dev/null +++ b/lib/cpp/src/thrift/transport/THeaderTransport.h @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_ +#define THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 1 + +#include <thrift/protocol/TBinaryProtocol.h> +#include <thrift/protocol/TProtocolTypes.h> +#include <thrift/transport/TBufferTransports.h> +#include <thrift/transport/TTransport.h> +#include <thrift/transport/TVirtualTransport.h> + +#include <bitset> +#include <boost/scoped_array.hpp> +#include <pwd.h> +#include <unistd.h> + +// Don't include the unknown client. +#define CLIENT_TYPES_LEN 3 + +enum CLIENT_TYPE { + THRIFT_HEADER_CLIENT_TYPE = 0, + THRIFT_FRAMED_DEPRECATED = 1, + THRIFT_UNFRAMED_DEPRECATED = 2, + THRIFT_UNKNOWN_CLIENT_TYPE = 4, +}; + +namespace apache { +namespace thrift { +namespace transport { + +using apache::thrift::protocol::T_COMPACT_PROTOCOL; + +/** + * Header transport. All writes go into an in-memory buffer until flush is + * called, at which point the transport writes the length of the entire + * binary chunk followed by the data payload. This allows the receiver on the + * other end to always do fixed-length reads. + * + * Subclass TFramedTransport because most of the read/write methods are similar + * and need similar buffers. Major changes are readFrame & flush. + * + * Header Transport *must* be the same transport for both input and + * output when used on the server side - client responses should be + * the same protocol as those in the request. + */ +class THeaderTransport : public TVirtualTransport<THeaderTransport, TFramedTransport> { +public: + static const int DEFAULT_BUFFER_SIZE = 512u; + static const int THRIFT_MAX_VARINT32_BYTES = 5; + + /// Use default buffer sizes. + explicit THeaderTransport(const boost::shared_ptr<TTransport>& transport) + : transport_(transport), + outTransport_(transport), + protoId(T_COMPACT_PROTOCOL), + clientType(THRIFT_HEADER_CLIENT_TYPE), + seqId(0), + flags(0), + tBufSize_(0), + tBuf_(NULL) { + initBuffers(); + } + + THeaderTransport(const boost::shared_ptr<TTransport> inTransport, + const boost::shared_ptr<TTransport> outTransport) + : transport_(inTransport), + outTransport_(outTransport), + protoId(T_COMPACT_PROTOCOL), + clientType(THRIFT_HEADER_CLIENT_TYPE), + seqId(0), + flags(0), + tBufSize_(0), + tBuf_(NULL) { + initBuffers(); + } + + void open() { transport_->open(); } + + bool isOpen() { return transport_->isOpen(); } + + bool peek() { return (this->rBase_ < this->rBound_) || transport_->peek(); } + + void close() { + flush(); + transport_->close(); + } + + virtual uint32_t readSlow(uint8_t* buf, uint32_t len); + virtual uint32_t readAll(uint8_t* buf, uint32_t len); + virtual void flush(); + + void resizeTransformBuffer(uint32_t additionalSize = 0); + + boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } + + /* + * TVirtualTransport provides a default implementation of readAll(). + * We want to use the TBufferBase version instead. + */ + using TBufferBase::readAll; + + uint16_t getProtocolId() const; + void setProtocolId(uint16_t protoId) { this->protoId = protoId; } + + void resetProtocol(); + + /** + * We know we got a packet in header format here, try to parse the header + * + * @param headerSize size of the header portion + * @param sz Size of the whole message, including header + */ + void readHeaderFormat(uint16_t headerSize, uint32_t sz); + + /** + * Untransform the data based on the recieved header flags + * On conclusion of function, setReadBuffer is called with the + * untransformed data. + * + * @param ptr ptr to data + * @param size of data + */ + void untransform(uint8_t* ptr, uint32_t sz); + + /** + * Transform the data based on our write transform flags + * At conclusion of function the write buffer is set to the + * transformed data. + * + * @param ptr Ptr to data to transform + * @param sz Size of data buffer + */ + void transform(uint8_t* ptr, uint32_t sz); + + uint16_t getNumTransforms() const { + int trans = writeTrans_.size(); + return trans; + } + + void setTransform(uint16_t transId) { writeTrans_.push_back(transId); } + + // Info headers + + typedef std::map<std::string, std::string> StringToStringMap; + + // these work with write headers + void setHeader(const std::string& key, const std::string& value); + + void clearHeaders(); + + StringToStringMap& getWriteHeaders() { return writeHeaders_; } + + // these work with read headers + const StringToStringMap& getHeaders() const { return readHeaders_; } + + // accessors for seqId + int32_t getSequenceNumber() const { return seqId; } + void setSequenceNumber(int32_t seqId) { this->seqId = seqId; } + + enum TRANSFORMS { + ZLIB_TRANSFORM = 0x01, + }; + +protected: + std::bitset<CLIENT_TYPES_LEN> supported_clients; + + void initSupportedClients(std::bitset<CLIENT_TYPES_LEN> const*); + + /** + * Reads a frame of input from the underlying stream. + * + * Returns true if a frame was read successfully, or false on EOF. + * (Raises a TTransportException if EOF occurs after a partial frame.) + */ + bool readFrame(uint32_t minFrameSize); + + void ensureReadBuffer(uint32_t sz); + uint32_t getWriteBytes(); + + void initBuffers() { + setReadBuffer(NULL, 0); + setWriteBuffer(wBuf_.get(), wBufSize_); + } + + boost::shared_ptr<TTransport> transport_; + boost::shared_ptr<TTransport> outTransport_; + + // 0 and 16th bits must be 0 to differentiate from framed & unframed + static const uint32_t HEADER_MAGIC = 0x0FFF0000; + static const uint32_t HEADER_MASK = 0xFFFF0000; + static const uint32_t FLAGS_MASK = 0x0000FFFF; + + static const uint32_t MAX_FRAME_SIZE = 0x3FFFFFFF; + + int16_t protoId; + uint16_t clientType; + uint32_t seqId; + uint16_t flags; + + std::vector<uint16_t> readTrans_; + std::vector<uint16_t> writeTrans_; + + // Map to use for headers + StringToStringMap readHeaders_; + StringToStringMap writeHeaders_; + + /** + * Returns the maximum number of bytes that write k/v headers can take + */ + size_t getMaxWriteHeadersSize() const; + + struct infoIdType { + enum idType { + // start at 1 to avoid confusing header padding for an infoId + KEYVALUE = 1, + END // signal the end of infoIds we can handle + }; + }; + + // Buffers to use for transform processing + uint32_t tBufSize_; + boost::scoped_array<uint8_t> tBuf_; + + void readString(uint8_t*& ptr, /* out */ std::string& str, uint8_t const* headerBoundary); + + void writeString(uint8_t*& ptr, const std::string& str); + + // Varint utils + /** + * Read an i16 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 3 bytes. + */ + uint32_t readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary); + + /** + * Read an i32 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 5 bytes. + */ + uint32_t readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary); + + /** + * Write an i32 as a varint. Results in 1-5 bytes on the wire. + */ + uint32_t writeVarint32(int32_t n, uint8_t* pkt); + + /** + * Write an i16 as a varint. Results in 1-3 bytes on the wire. + */ + uint32_t writeVarint16(int16_t n, uint8_t* pkt); +}; + +/** + * Wraps a transport into a header one. + * + */ +class THeaderTransportFactory : public TTransportFactory { +public: + THeaderTransportFactory() {} + + virtual ~THeaderTransportFactory() {} + + /** + * Wraps the transport into a header one. + */ + virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { + return boost::shared_ptr<TTransport>(new THeaderTransport(trans)); + } +}; +} +} +} // apache::thrift::transport + +#endif // #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/lib/cpp/test/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt index 5de9fc4..033b4d2 100644 --- a/lib/cpp/test/CMakeLists.txt +++ b/lib/cpp/test/CMakeLists.txt @@ -21,6 +21,7 @@ set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono filesystem system thread unit_test_framework) include_directories(SYSTEM "${Boost_INCLUDE_DIRS}") +include_directories(SYSTEM "${ZLIB_INCLUDE_DIRS}") #Make sure gen-cpp files can be included include_directories("${CMAKE_CURRENT_BINARY_DIR}") @@ -62,6 +63,7 @@ add_executable(Benchmark Benchmark.cpp) target_link_libraries(Benchmark testgencpp) LINK_AGAINST_THRIFT_LIBRARY(Benchmark thrift) add_test(NAME Benchmark COMMAND Benchmark) +target_link_libraries(Benchmark testgencpp ${ZLIB_LIBRARIES}) set(UnitTest_SOURCES UnitTestMain.cpp @@ -79,7 +81,8 @@ if(NOT WITH_BOOSTTHREADS AND NOT WITH_STDTHREADS AND NOT MSVC) endif() add_executable(UnitTests ${UnitTest_SOURCES}) -target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES}) +target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(UnitTests thrift) add_test(NAME UnitTests COMMAND UnitTests) if ( MSVC ) @@ -99,6 +102,7 @@ add_executable(TInterruptTest ${TInterruptTest_SOURCES}) target_link_libraries(TInterruptTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TInterruptTest thrift) if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") @@ -110,6 +114,7 @@ add_executable(TServerIntegrationTest TServerIntegrationTest.cpp) target_link_libraries(TServerIntegrationTest testgencpp_cob ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TServerIntegrationTest thrift) if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") @@ -144,6 +149,7 @@ add_executable(EnumTest EnumTest.cpp) target_link_libraries(EnumTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(EnumTest thrift) add_test(NAME EnumTest COMMAND EnumTest) @@ -153,6 +159,7 @@ add_executable(TFileTransportTest TFileTransportTest.cpp) target_link_libraries(TFileTransportTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TFileTransportTest thrift) add_test(NAME TFileTransportTest COMMAND TFileTransportTest) @@ -161,6 +168,7 @@ endif() add_executable(TFDTransportTest TFDTransportTest.cpp) target_link_libraries(TFDTransportTest ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TFDTransportTest thrift) add_test(NAME TFDTransportTest COMMAND TFDTransportTest) @@ -168,6 +176,7 @@ add_test(NAME TFDTransportTest COMMAND TFDTransportTest) add_executable(TPipedTransportTest TPipedTransportTest.cpp) target_link_libraries(TPipedTransportTest ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TPipedTransportTest thrift) add_test(NAME TPipedTransportTest COMMAND TPipedTransportTest) @@ -182,6 +191,7 @@ add_executable(AllProtocolsTest ${AllProtocolsTest_SOURCES}) target_link_libraries(AllProtocolsTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(AllProtocolsTest thrift) add_test(NAME AllProtocolsTest COMMAND AllProtocolsTest) @@ -192,6 +202,7 @@ add_executable(DebugProtoTest DebugProtoTest.cpp) target_link_libraries(DebugProtoTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(DebugProtoTest thrift) add_test(NAME DebugProtoTest COMMAND DebugProtoTest) @@ -201,6 +212,7 @@ add_executable(JSONProtoTest JSONProtoTest.cpp) target_link_libraries(JSONProtoTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(JSONProtoTest thrift) add_test(NAME JSONProtoTest COMMAND JSONProtoTest) @@ -209,6 +221,7 @@ add_executable(OptionalRequiredTest OptionalRequiredTest.cpp) target_link_libraries(OptionalRequiredTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(OptionalRequiredTest thrift) add_test(NAME OptionalRequiredTest COMMAND OptionalRequiredTest) @@ -217,6 +230,7 @@ add_executable(RecursiveTest RecursiveTest.cpp) target_link_libraries(RecursiveTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(RecursiveTest thrift) add_test(NAME RecursiveTest COMMAND RecursiveTest) @@ -225,6 +239,7 @@ add_executable(SpecializationTest SpecializationTest.cpp) target_link_libraries(SpecializationTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(SpecializationTest thrift) add_test(NAME SpecializationTest COMMAND SpecializationTest) @@ -238,6 +253,7 @@ set(concurrency_test_SOURCES add_executable(concurrency_test ${concurrency_test_SOURCES}) LINK_AGAINST_THRIFT_LIBRARY(concurrency_test thrift) add_test(NAME concurrency_test COMMAND concurrency_test) +target_link_libraries(concurrency_test ${ZLIB_LIBRARIES}) set(link_test_SOURCES link/LinkTest.cpp @@ -249,6 +265,7 @@ set(link_test_SOURCES add_executable(link_test ${link_test_SOURCES}) target_link_libraries(link_test testgencpp_cob) LINK_AGAINST_THRIFT_LIBRARY(link_test thrift) +target_link_libraries(link_test testgencpp ${ZLIB_LIBRARIES}) add_test(NAME link_test COMMAND link_test) if(WITH_LIBEVENT) @@ -264,6 +281,7 @@ add_executable(processor_test ${processor_test_SOURCES}) target_link_libraries(processor_test testgencpp_cob ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(processor_test thrift) LINK_AGAINST_THRIFT_LIBRARY(processor_test thriftnb) @@ -276,6 +294,7 @@ target_link_libraries(TNonblockingServerTest testgencpp_cob ${LIBEVENT_LIBRARIES} ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(TNonblockingServerTest thrift) LINK_AGAINST_THRIFT_LIBRARY(TNonblockingServerTest thriftnb) @@ -287,6 +306,7 @@ add_executable(OpenSSLManualInitTest OpenSSLManualInitTest.cpp) target_link_libraries(OpenSSLManualInitTest ${OPENSSL_LIBRARIES} ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(OpenSSLManualInitTest thrift) add_test(NAME OpenSSLManualInitTest COMMAND OpenSSLManualInitTest) @@ -295,6 +315,7 @@ add_executable(SecurityTest SecurityTest.cpp) target_link_libraries(SecurityTest testgencpp ${Boost_LIBRARIES} + ${ZLIB_LIBRARIES} ) LINK_AGAINST_THRIFT_LIBRARY(SecurityTest thrift) if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin") http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/test/cpp/CMakeLists.txt b/test/cpp/CMakeLists.txt index 2d75f2e..67d9510 100755 --- a/test/cpp/CMakeLists.txt +++ b/test/cpp/CMakeLists.txt @@ -54,23 +54,24 @@ add_library(crossstressgencpp STATIC ${crossstressgencpp_SOURCES}) LINK_AGAINST_THRIFT_LIBRARY(crossstressgencpp thrift) add_executable(TestServer src/TestServer.cpp) -target_link_libraries(TestServer crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +target_link_libraries(TestServer crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(TestServer thrift) LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftnb) +LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftnb) add_executable(TestClient src/TestClient.cpp) -target_link_libraries(TestClient crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +target_link_libraries(TestClient crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(TestClient thrift) LINK_AGAINST_THRIFT_LIBRARY(TestClient thriftnb) add_executable(StressTest src/StressTest.cpp) -target_link_libraries(StressTest crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +target_link_libraries(StressTest crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(StressTest thrift) LINK_AGAINST_THRIFT_LIBRARY(StressTest thriftnb) add_test(NAME StressTest COMMAND StressTest) add_executable(StressTestNonBlocking src/StressTestNonBlocking.cpp) -target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB}) +target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES}) LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thrift) LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftnb) LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftz) http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/cpp/Makefile.am ---------------------------------------------------------------------- diff --git a/test/cpp/Makefile.am b/test/cpp/Makefile.am index bc026b2..c609a71 100755 --- a/test/cpp/Makefile.am +++ b/test/cpp/Makefile.am @@ -69,7 +69,7 @@ TestServer_LDADD = \ $(top_builddir)/lib/cpp/libthrift.la \ $(top_builddir)/lib/cpp/libthriftz.la \ $(top_builddir)/lib/cpp/libthriftnb.la \ - -levent -lboost_program_options -lboost_system -lboost_filesystem + -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS) TestClient_SOURCES = \ src/TestClient.cpp @@ -79,7 +79,7 @@ TestClient_LDADD = \ $(top_builddir)/lib/cpp/libthrift.la \ $(top_builddir)/lib/cpp/libthriftz.la \ $(top_builddir)/lib/cpp/libthriftnb.la \ - -levent -lboost_program_options -lboost_system -lboost_filesystem + -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS) StressTest_SOURCES = \ src/StressTest.cpp http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/cpp/src/TestClient.cpp ---------------------------------------------------------------------- diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp index 2736ee8..47539dc 100644 --- a/test/cpp/src/TestClient.cpp +++ b/test/cpp/src/TestClient.cpp @@ -23,6 +23,7 @@ #include <iostream> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TCompactProtocol.h> +#include <thrift/protocol/THeaderProtocol.h> #include <thrift/protocol/TJSONProtocol.h> #include <thrift/transport/THttpClient.h> #include <thrift/transport/TTransportUtils.h> @@ -170,7 +171,7 @@ int main(int argc, char** argv) { "Transport: buffered, framed, http, evhttp")( "protocol", boost::program_options::value<string>(&protocol_type)->default_value(protocol_type), - "Protocol: binary, compact, json")("ssl", "Encrypted Transport using SSL")( + "Protocol: binary, header, compact, json")("ssl", "Encrypted Transport using SSL")( "testloops,n", boost::program_options::value<int>(&numTests)->default_value(numTests), "Number of Tests")("noinsane", "Do not run insanity test"); @@ -188,6 +189,7 @@ int main(int argc, char** argv) { if (!protocol_type.empty()) { if (protocol_type == "binary") { } else if (protocol_type == "compact") { + } else if (protocol_type == "header") { } else if (protocol_type == "json") { } else { throw invalid_argument("Unknown protocol type " + protocol_type); @@ -266,6 +268,9 @@ int main(int argc, char** argv) { } else if (protocol_type.compare("compact") == 0) { boost::shared_ptr<TProtocol> compactProtocol(new TCompactProtocol(transport)); protocol = compactProtocol; + } else if (protocol_type == "header") { + boost::shared_ptr<TProtocol> headerProtocol(new THeaderProtocol(transport)); + protocol = headerProtocol; } else { boost::shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport)); protocol = binaryProtocol; http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/cpp/src/TestServer.cpp ---------------------------------------------------------------------- diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp index 51169af..12c4b97 100644 --- a/test/cpp/src/TestServer.cpp +++ b/test/cpp/src/TestServer.cpp @@ -24,6 +24,7 @@ #include <thrift/concurrency/PlatformThreadFactory.h> #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/protocol/TCompactProtocol.h> +#include <thrift/protocol/THeaderProtocol.h> #include <thrift/protocol/TJSONProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/server/TThreadedServer.h> @@ -567,7 +568,7 @@ int main(int argc, char** argv) { "transport: buffered, framed, http")( "protocol", boost::program_options::value<string>(&protocol_type)->default_value(protocol_type), - "protocol: binary, compact, json")("ssl", "Encrypted Transport using SSL")( + "protocol: binary, compact, header, json")("ssl", "Encrypted Transport using SSL")( "processor-events", "processor-events")("workers,n", boost::program_options::value<size_t>(&workers)->default_value(workers), @@ -597,6 +598,7 @@ int main(int argc, char** argv) { if (protocol_type == "binary") { } else if (protocol_type == "compact") { } else if (protocol_type == "json") { + } else if (protocol_type == "header") { } else { throw invalid_argument("Unknown protocol type " + protocol_type); } @@ -633,6 +635,9 @@ int main(int argc, char** argv) { } else if (protocol_type == "compact") { boost::shared_ptr<TProtocolFactory> compactProtocolFactory(new TCompactProtocolFactory()); protocolFactory = compactProtocolFactory; + } else if (protocol_type == "header") { + boost::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory()); + protocolFactory = headerProtocolFactory; } else { boost::shared_ptr<TProtocolFactory> binaryProtocolFactory( new TBinaryProtocolFactoryT<TBufferBase>()); @@ -739,11 +744,16 @@ int main(int argc, char** argv) { TEvhttpServer nonblockingServer(testBufferProcessor, port); nonblockingServer.serve(); } else { - server.reset(new TNonblockingServer(testProcessor, port)); + server.reset(new TNonblockingServer(testProcessor, protocolFactory, port)); } } if (server.get() != NULL) { + if (protocol_type == "header") { + // Tell the server to use the same protocol for input / output + // if using header + server->setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>()); + } apache::thrift::concurrency::PlatformThreadFactory factory; factory.setDetached(false); boost::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server); http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/known_failures_Linux.json ---------------------------------------------------------------------- diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json index a20209b..4c78c52 100644 --- a/test/known_failures_Linux.json +++ b/test/known_failures_Linux.json @@ -11,6 +11,8 @@ "c_glib-rb_binary_framed-ip", "cpp-cpp_binary_http-ip", "cpp-cpp_compact_http-domain", + "cpp-cpp_header_http-domain", + "cpp-cpp_header_http-ip", "cpp-cpp_json_http-ip", "cpp-hs_json_buffered-ip", "cpp-hs_json_framed-ip", http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/test/tests.json ---------------------------------------------------------------------- diff --git a/test/tests.json b/test/tests.json index 44040f2..aeff933 100644 --- a/test/tests.json +++ b/test/tests.json @@ -228,7 +228,8 @@ "protocols": [ "compact", "binary", - "json" + "json", + "header" ], "workdir": "cpp" }, http://git-wip-us.apache.org/repos/asf/thrift/blob/792db4e9/tutorial/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/tutorial/cpp/CMakeLists.txt b/tutorial/cpp/CMakeLists.txt index 2b0c143..42d92ff 100644 --- a/tutorial/cpp/CMakeLists.txt +++ b/tutorial/cpp/CMakeLists.txt @@ -45,7 +45,9 @@ add_custom_command(OUTPUT gen-cpp/Calculator.cpp gen-cpp/SharedService.cpp gen-c add_executable(TutorialServer CppServer.cpp) target_link_libraries(TutorialServer tutorialgencpp) LINK_AGAINST_THRIFT_LIBRARY(TutorialServer thrift) +target_link_libraries(TutorialServer ${ZLIB_LIBRARIES}) add_executable(TutorialClient CppClient.cpp) target_link_libraries(TutorialClient tutorialgencpp) LINK_AGAINST_THRIFT_LIBRARY(TutorialClient thrift) +target_link_libraries(TutorialClient ${ZLIB_LIBRARIES})
