Github user ben-craig commented on a diff in the pull request: https://github.com/apache/thrift/pull/357#discussion_r23507741 --- Diff: lib/cpp/src/thrift/transport/THeaderTransport.cpp --- @@ -0,0 +1,606 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <thrift/transport/THeaderTransport.h> +#include <thrift/TApplicationException.h> +#include <thrift/protocol/TProtocolTypes.h> + +#include <algorithm> +#include <bitset> +#include <cassert> +#include <string> +#include <zlib.h> + +using std::map; +using boost::shared_ptr; +using std::string; +using std::vector; + +namespace apache { namespace thrift { namespace transport { + +using namespace apache::thrift::protocol; +using apache::thrift::protocol::TBinaryProtocol; + + +uint32_t THeaderTransport::readAll(uint8_t* buf, uint32_t len) { + // We want to call TBufferBase's version here, because + // TFramedTransport would try and call its own readFrame function + return TBufferBase::readAll(buf, len); +} + +uint32_t THeaderTransport::readSlow(uint8_t* buf, uint32_t len) { + + if (clientType == THRIFT_UNFRAMED_DEPRECATED) { + return transport_->read(buf, len); + } + + return TFramedTransport::readSlow(buf, len); +} + +uint16_t THeaderTransport::getProtocolId() const { + if (clientType == THRIFT_HEADER_CLIENT_TYPE) { + return protoId; + } else { + return T_BINARY_PROTOCOL; // Assume other transports use TBinary + } +} + +void THeaderTransport::allocateReadBuffer(uint32_t sz) { + if (sz > rBufSize_) { + rBuf_.reset(new uint8_t[sz]); + rBufSize_ = sz; + } +} + +bool THeaderTransport::readFrame(uint32_t minFrameSize) { + // szN is network byte order of sz + uint32_t szN; + uint32_t sz; + + // Read the size of the next frame. + // We can't use readAll(&sz, sizeof(sz)), since that always throws an + // exception on EOF. We want to throw an exception only if EOF occurs after + // partial size data. + uint32_t sizeBytesRead = 0; + while (sizeBytesRead < sizeof(szN)) { + uint8_t* szp = reinterpret_cast<uint8_t*>(&szN) + sizeBytesRead; + uint32_t bytesRead = transport_->read(szp, sizeof(szN) - sizeBytesRead); + if (bytesRead == 0) { + if (sizeBytesRead == 0) { + // EOF before any data was read. + return false; + } else { + // EOF after a partial frame header. Raise an exception. + throw TTransportException(TTransportException::END_OF_FILE, + "No more data to read after " + "partial frame header."); + } + } + sizeBytesRead += bytesRead; + } + + sz = ntohl(szN); + + allocateReadBuffer(minFrameSize + 4); + + if ((sz & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) { + // unframed + clientType = THRIFT_UNFRAMED_DEPRECATED; + memcpy(rBuf_.get(), &szN, sizeof(szN)); + if (minFrameSize > 4) { + transport_->readAll(rBuf_.get() + 4, minFrameSize - 4); + setReadBuffer(rBuf_.get(), minFrameSize); + } else { + setReadBuffer(rBuf_.get(), 4); + } + } else { + // Could be header format or framed. Check next uint32 + uint32_t magic_n; + uint32_t magic; + + if (sz > MAX_FRAME_SIZE) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header transport frame is too large"); + } + + allocateReadBuffer(sz); + + // We can use readAll here, because it would be an invalid frame otherwise + transport_->readAll(reinterpret_cast<uint8_t*>(&magic_n), sizeof(magic_n)); + memcpy(rBuf_.get(), &magic_n, sizeof(magic_n)); + magic = ntohl(magic_n); + + if ((magic & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) { + // framed + clientType = THRIFT_FRAMED_DEPRECATED; + transport_->readAll(rBuf_.get() + 4, sz - 4); + setReadBuffer(rBuf_.get(), sz); + } else if (HEADER_MAGIC == (magic & HEADER_MASK)) { + if (sz < 10) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header transport frame is too small"); + } + + transport_->readAll(rBuf_.get() + 4, sz - 4); + + // header format + clientType = THRIFT_HEADER_CLIENT_TYPE; + // flags + flags = magic & FLAGS_MASK; + // seqId + uint32_t seqId_n; + memcpy(&seqId_n, rBuf_.get() + 4, sizeof(seqId_n)); + seqId = ntohl(seqId_n); + // header size + uint16_t headerSize_n; + memcpy(&headerSize_n, rBuf_.get() + 8, sizeof(headerSize_n)); + uint16_t headerSize = ntohs(headerSize_n); + setReadBuffer(rBuf_.get(), sz); + readHeaderFormat(headerSize, sz); + } else { + clientType = THRIFT_UNKNOWN_CLIENT_TYPE; + throw TTransportException(TTransportException::BAD_ARGS, + "Could not detect client transport type"); + } + } + + + return true; +} + + +/** + * Reads a string from ptr, taking care not to reach headerBoundary + * Advances ptr on success + * + * @param str output string + * @throws CORRUPTED_DATA if size of string exceeds boundary + */ +void THeaderTransport::readString(uint8_t* &ptr, /* out */ string &str, + uint8_t const* headerBoundary) { + int32_t strLen; + + uint32_t bytes = readVarint32(ptr, &strLen, headerBoundary); + if (ptr + strLen > headerBoundary) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Info header length exceeds header size"); + } + ptr += bytes; + str.assign(reinterpret_cast<const char*>(ptr), strLen); + ptr += strLen; +} + +void THeaderTransport::readHeaderFormat(uint16_t headerSize, uint32_t sz) { + readTrans_.clear(); // Clear out any previous transforms. + readHeaders_.clear(); // Clear out any previous headers. + + // skip over already processed magic(4), seqId(4), headerSize(2) + uint8_t* ptr = reinterpret_cast<uint8_t*>(rBuf_.get() + 10); + headerSize *= 4; + const uint8_t* const headerBoundary = ptr + headerSize; + if (headerSize > sz) { + throw TTransportException(TTransportException::CORRUPTED_DATA, + "Header size is larger than frame"); + } + uint8_t* data = ptr + headerSize; + ptr += readVarint16(ptr, &protoId, headerBoundary); --- End diff -- I'm not thrilled with VarInts here. I suspect that for the Facebook use case, you are stuck with them, because you are probably already running lots of services with VarInts in the headers. Why am I not fond of VarInts? They save space on the wire, so that must be good, right? Well, for most use cases, the slow part of networking is because of latency, not throughput. Stated another way, round trips are expensive, but extra bytes in the same round trip tend not to be. On a gigabit network, a byte costs roughly 8 nanoseconds, and VarInt16 will only save one byte in its best case. So let's translate this to processor time now. Assume a 3 GHz processor. 3 cycles fit into one nanosecond. Does the entirety of the code required to deal with VarInts in the good case fit in 24 cycles? If it takes more cycles, then you have slowed things down. Now, this is a gross simplification in many ways, as it doesn't account for caches, or the work that the network stacks have to perform to send one more byte... but it does mean that VarInts aren't a clear win. Now maybe this doesn't apply to Facebook. I don't know whether they are more bandwidth bound or CPU bound, and I don't know whether CPU time or network time consumes more power. Either way though, I am skeptical of this design choice. Maybe we can get the best for Facebook and the best for Apache by using a different version / magic number to distinguish the cases. Or maybe we call this TCompactHeader and something else TBinaryHeader.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---