This is an automated email from the ASF dual-hosted git repository. tmarshall pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 9e13fd7de53978f7ef8543a3661ce70c5ed4d60a Author: Thomas Tauber-Marshall <[email protected]> AuthorDate: Wed May 8 12:33:54 2019 -0700 IMPALA-8538 (part 1) Copied THttp(Server|Transport) from thrift-0.9.3 This is a mechanical change that just copies several files over from thrift. This is for convenience in reviewing changes to these files, which have been submitted as a follow up patch. Change-Id: I1916e17eaeb7854eb93c2415396f0ee0243e4e32 Reviewed-on: http://gerrit.cloudera.org:8080/13298 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/transport/CMakeLists.txt | 4 +- be/src/transport/THttpServer.cpp | 164 ++++++++++++++++++++++ be/src/transport/THttpServer.h | 64 +++++++++ be/src/transport/THttpTransport.cpp | 267 ++++++++++++++++++++++++++++++++++++ be/src/transport/THttpTransport.h | 104 ++++++++++++++ 5 files changed, 602 insertions(+), 1 deletion(-) diff --git a/be/src/transport/CMakeLists.txt b/be/src/transport/CMakeLists.txt index 8a9eda8..59fca8b 100644 --- a/be/src/transport/CMakeLists.txt +++ b/be/src/transport/CMakeLists.txt @@ -24,10 +24,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/transport") add_library(ThriftSaslTransport + THttpServer.cpp + THttpTransport.cpp TSaslClientTransport.cpp TSasl.cpp TSaslServerTransport.cpp TSaslTransport.cpp undef.cpp ) -add_dependencies(ThriftSaslTransport gen-deps) \ No newline at end of file +add_dependencies(ThriftSaslTransport gen-deps) diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp new file mode 100644 index 0000000..a20d612 --- /dev/null +++ b/be/src/transport/THttpServer.cpp @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <cstdlib> +#include <sstream> +#include <iostream> + +#include <thrift/transport/THttpServer.h> +#include <thrift/transport/TSocket.h> +#ifdef _MSC_VER +#include <Shlwapi.h> +#endif + +namespace apache { +namespace thrift { +namespace transport { + +using namespace std; + +THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : THttpTransport(transport) { +} + +THttpServer::~THttpServer() { +} + +#ifdef _MSC_VER + #define THRIFT_strncasecmp(str1, str2, len) _strnicmp(str1, str2, len) + #define THRIFT_strcasestr(haystack, needle) StrStrIA(haystack, needle) +#else + #define THRIFT_strncasecmp(str1, str2, len) strncasecmp(str1, str2, len) + #define THRIFT_strcasestr(haystack, needle) strcasestr(haystack, needle) +#endif + +void THttpServer::parseHeader(char* header) { + char* colon = strchr(header, ':'); + if (colon == NULL) { + return; + } + size_t sz = colon - header; + char* value = colon + 1; + + if (THRIFT_strncasecmp(header, "Transfer-Encoding", sz) == 0) { + if (THRIFT_strcasestr(value, "chunked") != NULL) { + chunked_ = true; + } + } else if (THRIFT_strncasecmp(header, "Content-length", sz) == 0) { + chunked_ = false; + contentLength_ = atoi(value); + } else if (strncmp(header, "X-Forwarded-For", sz) == 0) { + origin_ = value; + } +} + +bool THttpServer::parseStatusLine(char* status) { + char* method = status; + + char* path = strchr(method, ' '); + if (path == NULL) { + throw TTransportException(string("Bad Status: ") + status); + } + + *path = '\0'; + while (*(++path) == ' ') { + }; + + char* http = strchr(path, ' '); + if (http == NULL) { + throw TTransportException(string("Bad Status: ") + status); + } + *http = '\0'; + + if (strcmp(method, "POST") == 0) { + // POST method ok, looking for content. + return true; + } else if (strcmp(method, "OPTIONS") == 0) { + // preflight OPTIONS method, we don't need further content. + // how to graciously close connection? + uint8_t* buf; + uint32_t len; + writeBuffer_.getBuffer(&buf, &len); + + // Construct the HTTP header + std::ostringstream h; + h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF + << "Access-Control-Allow-Origin: *" << CRLF << "Access-Control-Allow-Methods: POST, OPTIONS" + << CRLF << "Access-Control-Allow-Headers: Content-Type" << CRLF << CRLF; + string header = h.str(); + + // Write the header, then the data, then flush + transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size())); + transport_->write(buf, len); + transport_->flush(); + + // Reset the buffer and header variables + writeBuffer_.resetBuffer(); + readHeaders_ = true; + return true; + } + throw TTransportException(string("Bad Status (unsupported method): ") + status); +} + +void THttpServer::flush() { + // Fetch the contents of the write buffer + uint8_t* buf; + uint32_t len; + writeBuffer_.getBuffer(&buf, &len); + + // Construct the HTTP header + std::ostringstream h; + h << "HTTP/1.1 200 OK" << CRLF << "Date: " << getTimeRFC1123() << CRLF << "Server: Thrift/" + << VERSION << CRLF << "Access-Control-Allow-Origin: *" << CRLF + << "Content-Type: application/x-thrift" << CRLF << "Content-Length: " << len << CRLF + << "Connection: Keep-Alive" << CRLF << CRLF; + string header = h.str(); + + // Write the header, then the data, then flush + // cast should be fine, because none of "header" is under attacker control + transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size())); + transport_->write(buf, len); + transport_->flush(); + + // Reset the buffer and header variables + writeBuffer_.resetBuffer(); + readHeaders_ = true; +} + +std::string THttpServer::getTimeRFC1123() { + static const char* Days[] = {"Sun", "Mon", "Tue", "Wed", "Thu", "Fri", "Sat"}; + static const char* Months[] + = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}; + char buff[128]; + time_t t = time(NULL); + tm* broken_t = gmtime(&t); + + sprintf(buff, + "%s, %d %s %d %d:%d:%d GMT", + Days[broken_t->tm_wday], + broken_t->tm_mday, + Months[broken_t->tm_mon], + broken_t->tm_year + 1900, + broken_t->tm_hour, + broken_t->tm_min, + broken_t->tm_sec); + return std::string(buff); +} +} +} +} // apache::thrift::transport diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h new file mode 100644 index 0000000..a7ab944 --- /dev/null +++ b/be/src/transport/THttpServer.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_ +#define _THRIFT_TRANSPORT_THTTPSERVER_H_ 1 + +#include <thrift/transport/THttpTransport.h> + +namespace apache { +namespace thrift { +namespace transport { + +class THttpServer : public THttpTransport { +public: + THttpServer(boost::shared_ptr<TTransport> transport); + + virtual ~THttpServer(); + + virtual void flush(); + +protected: + void readHeaders(); + virtual void parseHeader(char* header); + virtual bool parseStatusLine(char* status); + std::string getTimeRFC1123(); +}; + +/** + * Wraps a transport into HTTP protocol + */ +class THttpServerTransportFactory : public TTransportFactory { +public: + THttpServerTransportFactory() {} + + virtual ~THttpServerTransportFactory() {} + + /** + * Wraps the transport into a buffered one. + */ + virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { + return boost::shared_ptr<TTransport>(new THttpServer(trans)); + } +}; +} +} +} // apache::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_THTTPSERVER_H_ diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp new file mode 100644 index 0000000..a466ff6 --- /dev/null +++ b/be/src/transport/THttpTransport.cpp @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <sstream> + +#include <thrift/transport/THttpTransport.h> + +namespace apache { +namespace thrift { +namespace transport { + +using namespace std; + +// Yeah, yeah, hacky to put these here, I know. +const char* THttpTransport::CRLF = "\r\n"; +const int THttpTransport::CRLF_LEN = 2; + +THttpTransport::THttpTransport(boost::shared_ptr<TTransport> transport) + : transport_(transport), + origin_(""), + readHeaders_(true), + chunked_(false), + chunkedDone_(false), + chunkSize_(0), + contentLength_(0), + httpBuf_(NULL), + httpPos_(0), + httpBufLen_(0), + httpBufSize_(1024) { + init(); +} + +void THttpTransport::init() { + httpBuf_ = (char*)std::malloc(httpBufSize_ + 1); + if (httpBuf_ == NULL) { + throw std::bad_alloc(); + } + httpBuf_[httpBufLen_] = '\0'; +} + +THttpTransport::~THttpTransport() { + if (httpBuf_ != NULL) { + std::free(httpBuf_); + } +} + +uint32_t THttpTransport::read(uint8_t* buf, uint32_t len) { + if (readBuffer_.available_read() == 0) { + readBuffer_.resetBuffer(); + uint32_t got = readMoreData(); + if (got == 0) { + return 0; + } + } + return readBuffer_.read(buf, len); +} + +uint32_t THttpTransport::readEnd() { + // Read any pending chunked data (footers etc.) + if (chunked_) { + while (!chunkedDone_) { + readChunked(); + } + } + return 0; +} + +uint32_t THttpTransport::readMoreData() { + uint32_t size; + + // Get more data! + refill(); + + if (readHeaders_) { + readHeaders(); + } + + if (chunked_) { + size = readChunked(); + } else { + size = readContent(contentLength_); + readHeaders_ = true; + } + + return size; +} + +uint32_t THttpTransport::readChunked() { + uint32_t length = 0; + + char* line = readLine(); + uint32_t chunkSize = parseChunkSize(line); + if (chunkSize == 0) { + readChunkedFooters(); + } else { + // Read data content + length += readContent(chunkSize); + // Read trailing CRLF after content + readLine(); + } + return length; +} + +void THttpTransport::readChunkedFooters() { + // End of data, read footer lines until a blank one appears + while (true) { + char* line = readLine(); + if (strlen(line) == 0) { + chunkedDone_ = true; + break; + } + } +} + +uint32_t THttpTransport::parseChunkSize(char* line) { + char* semi = strchr(line, ';'); + if (semi != NULL) { + *semi = '\0'; + } + uint32_t size = 0; + sscanf(line, "%x", &size); + return size; +} + +uint32_t THttpTransport::readContent(uint32_t size) { + uint32_t need = size; + while (need > 0) { + uint32_t avail = httpBufLen_ - httpPos_; + if (avail == 0) { + // We have given all the data, reset position to head of the buffer + httpPos_ = 0; + httpBufLen_ = 0; + refill(); + + // Now have available however much we read + avail = httpBufLen_; + } + uint32_t give = avail; + if (need < give) { + give = need; + } + readBuffer_.write((uint8_t*)(httpBuf_ + httpPos_), give); + httpPos_ += give; + need -= give; + } + return size; +} + +char* THttpTransport::readLine() { + while (true) { + char* eol = NULL; + + eol = strstr(httpBuf_ + httpPos_, CRLF); + + // No CRLF yet? + if (eol == NULL) { + // Shift whatever we have now to front and refill + shift(); + refill(); + } else { + // Return pointer to next line + *eol = '\0'; + char* line = httpBuf_ + httpPos_; + httpPos_ = static_cast<uint32_t>((eol - httpBuf_) + CRLF_LEN); + return line; + } + } +} + +void THttpTransport::shift() { + if (httpBufLen_ > httpPos_) { + // Shift down remaining data and read more + uint32_t length = httpBufLen_ - httpPos_; + memmove(httpBuf_, httpBuf_ + httpPos_, length); + httpBufLen_ = length; + } else { + httpBufLen_ = 0; + } + httpPos_ = 0; + httpBuf_[httpBufLen_] = '\0'; +} + +void THttpTransport::refill() { + uint32_t avail = httpBufSize_ - httpBufLen_; + if (avail <= (httpBufSize_ / 4)) { + httpBufSize_ *= 2; + httpBuf_ = (char*)std::realloc(httpBuf_, httpBufSize_ + 1); + if (httpBuf_ == NULL) { + throw std::bad_alloc(); + } + } + + // Read more data + uint32_t got = transport_->read((uint8_t*)(httpBuf_ + httpBufLen_), httpBufSize_ - httpBufLen_); + httpBufLen_ += got; + httpBuf_[httpBufLen_] = '\0'; + + if (got == 0) { + throw TTransportException("Could not refill buffer"); + } +} + +void THttpTransport::readHeaders() { + // Initialize headers state variables + contentLength_ = 0; + chunked_ = false; + chunkedDone_ = false; + chunkSize_ = 0; + + // Control state flow + bool statusLine = true; + bool finished = false; + + // Loop until headers are finished + while (true) { + char* line = readLine(); + + if (strlen(line) == 0) { + if (finished) { + readHeaders_ = false; + return; + } else { + // Must have been an HTTP 100, keep going for another status line + statusLine = true; + } + } else { + if (statusLine) { + statusLine = false; + finished = parseStatusLine(line); + } else { + parseHeader(line); + } + } + } +} + +void THttpTransport::write(const uint8_t* buf, uint32_t len) { + writeBuffer_.write(buf, len); +} + +const std::string THttpTransport::getOrigin() { + std::ostringstream oss; + if (!origin_.empty()) { + oss << origin_ << ", "; + } + oss << transport_->getOrigin(); + return oss.str(); +} +} +} +} diff --git a/be/src/transport/THttpTransport.h b/be/src/transport/THttpTransport.h new file mode 100644 index 0000000..a9f564c --- /dev/null +++ b/be/src/transport/THttpTransport.h @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ +#define _THRIFT_TRANSPORT_THTTPTRANSPORT_H_ 1 + +#include <thrift/transport/TBufferTransports.h> +#include <thrift/transport/TVirtualTransport.h> + +namespace apache { +namespace thrift { +namespace transport { + +/** + * HTTP implementation of the thrift transport. This was irritating + * to write, but the alternatives in C++ land are daunting. Linking CURL + * requires 23 dynamic libraries last time I checked (WTF?!?). All we have + * here is a VERY basic HTTP/1.1 client which supports HTTP 100 Continue, + * chunked transfer encoding, keepalive, etc. Tested against Apache. + */ +class THttpTransport : public TVirtualTransport<THttpTransport> { +public: + THttpTransport(boost::shared_ptr<TTransport> transport); + + virtual ~THttpTransport(); + + void open() { transport_->open(); } + + bool isOpen() { return transport_->isOpen(); } + + bool peek() { return transport_->peek(); } + + void close() { transport_->close(); } + + uint32_t read(uint8_t* buf, uint32_t len); + + uint32_t readEnd(); + + void write(const uint8_t* buf, uint32_t len); + + virtual void flush() = 0; + + virtual const std::string getOrigin(); + +protected: + boost::shared_ptr<TTransport> transport_; + std::string origin_; + + TMemoryBuffer writeBuffer_; + TMemoryBuffer readBuffer_; + + bool readHeaders_; + bool chunked_; + bool chunkedDone_; + uint32_t chunkSize_; + uint32_t contentLength_; + + char* httpBuf_; + uint32_t httpPos_; + uint32_t httpBufLen_; + uint32_t httpBufSize_; + + virtual void init(); + + uint32_t readMoreData(); + char* readLine(); + + void readHeaders(); + virtual void parseHeader(char* header) = 0; + virtual bool parseStatusLine(char* status) = 0; + + uint32_t readChunked(); + void readChunkedFooters(); + uint32_t parseChunkSize(char* line); + + uint32_t readContent(uint32_t size); + + void refill(); + void shift(); + + static const char* CRLF; + static const int CRLF_LEN; +}; +} +} +} // apache::thrift::transport + +#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_
