Repository: trafficserver Updated Branches: refs/heads/master faf38a5e0 -> 148b47455
atscppapi: Added missing intercept files Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/51fc9ff9 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/51fc9ff9 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/51fc9ff9 Branch: refs/heads/master Commit: 51fc9ff92b41bec074613367c5a708103794be68 Parents: b4c3400 Author: Manjesh Nilange <[email protected]> Authored: Thu Mar 6 15:12:02 2014 -0800 Committer: Manjesh Nilange <[email protected]> Committed: Thu Mar 6 15:12:02 2014 -0800 ---------------------------------------------------------------------- lib/atscppapi/src/InterceptPlugin.cc | 295 +++++++++++++++++++ .../src/include/atscppapi/InterceptPlugin.h | 98 ++++++ 2 files changed, 393 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/51fc9ff9/lib/atscppapi/src/InterceptPlugin.cc ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/InterceptPlugin.cc b/lib/atscppapi/src/InterceptPlugin.cc new file mode 100644 index 0000000..dd7242f --- /dev/null +++ b/lib/atscppapi/src/InterceptPlugin.cc @@ -0,0 +1,295 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +/** + * @file InterceptPlugin.cc + */ + +#include "atscppapi/InterceptPlugin.h" + +#include <ts/ts.h> +#include "logging_internal.h" +#include "atscppapi/noncopyable.h" +#include "utils_internal.h" + +#include <cstdlib> +#include <cerrno> + +#ifndef INT64_MAX +#define INT64_MAX (9223372036854775807LL) +#endif + +using namespace atscppapi; +using std::string; + +/** + * @private + */ +struct InterceptPlugin::State { + TSCont cont_; + TSVConn net_vc_; + + struct IoHandle { + TSVIO vio_; + TSIOBuffer buffer_; + TSIOBufferReader reader_; + IoHandle() : vio_(NULL), buffer_(NULL), reader_(NULL) { }; + ~IoHandle() { + if (reader_) { + TSIOBufferReaderFree(reader_); + } + if (buffer_) { + TSIOBufferDestroy(buffer_); + } + }; + }; + + IoHandle input_; + IoHandle output_; + + /** the API doesn't recognize end of input; so we have to explicitly + * figure out when to continue reading and when to stop */ + TSHttpParser http_parser_; + int expected_body_size_; + int num_body_bytes_read_; + bool hdr_parsed_; + + TSMBuffer hdr_buf_; + TSMLoc hdr_loc_; + int num_bytes_written_; + + State(TSCont cont) : cont_(cont), net_vc_(NULL), expected_body_size_(0), num_body_bytes_read_(0), + hdr_parsed_(false), hdr_buf_(NULL), hdr_loc_(NULL), num_bytes_written_(0) { + http_parser_ = TSHttpParserCreate(); + } + + ~State() { + TSHttpParserDestroy(http_parser_); + if (hdr_loc_) { + TSHandleMLocRelease(hdr_buf_, TS_NULL_MLOC, hdr_loc_); + } + if (hdr_buf_) { + TSMBufferDestroy(hdr_buf_); + } + } +}; + +namespace { + +int handleEvents(TSCont cont, TSEvent event, void *edata); + +} + +InterceptPlugin::InterceptPlugin(Transaction &transaction, InterceptPlugin::Type type) + : TransactionPlugin(transaction) { + TSCont cont = TSContCreate(handleEvents, TSMutexCreate()); + TSContDataSet(cont, this); + state_ = new State(cont); + TSHttpTxn txn = static_cast<TSHttpTxn>(transaction.getAtsHandle()); + if (type == SERVER_INTERCEPT) { + TSHttpTxnServerIntercept(cont, txn); + } + else { + TSHttpTxnIntercept(cont, txn); + } + Headers &request_headers = transaction.getClientRequest().getHeaders(); + string content_length_str = request_headers.value("Content-Length"); + if (!content_length_str.empty()) { + const char *start_ptr = content_length_str.data(); + char *end_ptr; + int content_length = strtol(start_ptr, &end_ptr, 10 /* base */); + if ((errno != ERANGE) && (end_ptr != start_ptr) && (*end_ptr == '\0')) { + LOG_DEBUG("Got content length: %d", content_length); + state_->expected_body_size_ = content_length; + } + else { + LOG_ERROR("Invalid content length header [%s]; Assuming no content", content_length_str.c_str()); + } + } + if (request_headers.value("Transfer-Encoding") == "chunked") { + // implementing a "dechunker" is non-trivial and in the real + // world, most browsers don't send chunked requests + LOG_ERROR("Support for chunked request not implemented! Assuming no body"); + } + LOG_DEBUG("Expecting %d bytes of request body", state_->expected_body_size_); +} + +InterceptPlugin::~InterceptPlugin() { + TSContDestroy(state_->cont_); + delete state_; +} + +bool InterceptPlugin::produce(const void *data, int data_size) { + ScopedSharedMutexLock scopedLock(getMutex()); + if (!state_->net_vc_) { + LOG_ERROR("Intercept not operational yet"); + return false; + } + if (!state_->output_.buffer_) { + state_->output_.buffer_ = TSIOBufferCreate(); + state_->output_.reader_ = TSIOBufferReaderAlloc(state_->output_.buffer_); + state_->output_.vio_ = TSVConnWrite(state_->net_vc_, state_->cont_, state_->output_.reader_, INT64_MAX); + } + int num_bytes_written = TSIOBufferWrite(state_->output_.buffer_, data, data_size); + if (num_bytes_written != data_size) { + LOG_ERROR("Error while writing to buffer! Attempted %d bytes but only wrote %d bytes", data_size, + num_bytes_written); + return false; + } + TSVIOReenable(state_->output_.vio_); + state_->num_bytes_written_ += data_size; + LOG_DEBUG("Wrote %d bytes in response", data_size); + return true; +} + +bool InterceptPlugin::setOutputComplete() { + ScopedSharedMutexLock scopedLock(getMutex()); + if (!state_->net_vc_) { + LOG_ERROR("Intercept not operational yet"); + return false; + } + if (!state_->output_.buffer_) { + LOG_ERROR("No output produced so far"); + return false; + } + TSVIONBytesSet(state_->output_.vio_, state_->num_bytes_written_); + TSVIOReenable(state_->output_.vio_); + LOG_DEBUG("Response complete"); + return true; +} + +bool InterceptPlugin::doRead() { + int avail = TSIOBufferReaderAvail(state_->input_.reader_); + if (avail == TS_ERROR) { + LOG_ERROR("Error while getting number of bytes available"); + return false; + } + + int consumed = 0; // consumed is used to update the input buffers + if (avail > 0) { + int64_t num_body_bytes_in_block; + int64_t data_len; // size of all data (header + body) in a block + const char *data, *startptr; + TSIOBufferBlock block = TSIOBufferReaderStart(state_->input_.reader_); + while (block != NULL) { + startptr = data = TSIOBufferBlockReadStart(block, state_->input_.reader_, &data_len); + num_body_bytes_in_block = 0; + if (!state_->hdr_parsed_) { + const char *endptr = data + data_len; + if (TSHttpHdrParseReq(state_->http_parser_, state_->hdr_buf_, state_->hdr_loc_, &data, + endptr) == TS_PARSE_DONE) { + LOG_DEBUG("Parsed header"); + state_->hdr_parsed_ = true; + // remaining data in this block is body; 'data' will be pointing to first byte of the body + num_body_bytes_in_block = endptr - data; + } + ScopedSharedMutexLock scopedLock(getMutex()); + consume(string(startptr, data - startptr), InterceptPlugin::REQUEST_HEADER); + } + else { + num_body_bytes_in_block = data_len; + } + if (num_body_bytes_in_block) { + state_->num_body_bytes_read_ += num_body_bytes_in_block; + ScopedSharedMutexLock scopedLock(getMutex()); + consume(string(data, num_body_bytes_in_block), InterceptPlugin::REQUEST_BODY); + } + consumed += data_len; + block = TSIOBufferBlockNext(block); + } + } + LOG_DEBUG("Consumed %d bytes from input vio", consumed); + TSIOBufferReaderConsume(state_->input_.reader_, consumed); + + // Modify the input VIO to reflect how much data we've completed. + TSVIONDoneSet(state_->input_.vio_, TSVIONDoneGet(state_->input_.vio_) + consumed); + + if ((state_->hdr_parsed_) && (state_->num_body_bytes_read_ >= state_->expected_body_size_)) { + LOG_DEBUG("Completely read body"); + if (state_->num_body_bytes_read_ > state_->expected_body_size_) { + LOG_ERROR("Read more data than specified in request"); + // TODO: any further action required? + } + ScopedSharedMutexLock scopedLock(getMutex()); + handleInputComplete(); + } + else { + LOG_DEBUG("Reenabling input vio as %d bytes still need to be read", + state_->expected_body_size_ - state_->num_body_bytes_read_); + TSVIOReenable(state_->input_.vio_); + } + return true; +} + +void InterceptPlugin::handleEvent(int abstract_event, void *edata) { + TSEvent event = static_cast<TSEvent>(abstract_event); + LOG_DEBUG("Received event %d", event); + + switch (event) { + + case TS_EVENT_NET_ACCEPT: + LOG_DEBUG("Handling net accept"); + state_->net_vc_ = static_cast<TSVConn>(edata); + state_->input_.buffer_ = TSIOBufferCreate(); + state_->input_.reader_ = TSIOBufferReaderAlloc(state_->input_.buffer_); + state_->input_.vio_ = TSVConnRead(state_->net_vc_, state_->cont_, state_->input_.buffer_, + INT64_MAX /* number of bytes to read - high value initially */); + + state_->hdr_buf_ = TSMBufferCreate(); + state_->hdr_loc_ = TSHttpHdrCreate(state_->hdr_buf_); + TSHttpHdrTypeSet(state_->hdr_buf_, state_->hdr_loc_, TS_HTTP_TYPE_REQUEST); + break; + + case TS_EVENT_VCONN_READ_READY: + LOG_DEBUG("Handling read ready"); + if (doRead()) { + break; + } + // else fall through into the next shut down cases + LOG_ERROR("Error while reading request!"); + case TS_EVENT_VCONN_READ_COMPLETE: // fall throughs intentional + case TS_EVENT_VCONN_WRITE_COMPLETE: + case TS_EVENT_VCONN_EOS: + case TS_EVENT_ERROR: // erroring out, nothing more to do + case TS_EVENT_NET_ACCEPT_FAILED: // somebody canceled the transaction + if (event == TS_EVENT_ERROR) { + LOG_ERROR("Unknown Error!"); + } + else if (event == TS_EVENT_NET_ACCEPT_FAILED) { + LOG_ERROR("Got net_accept_failed!"); + } + LOG_DEBUG("Shutting down"); + if (state_->net_vc_) { + TSVConnClose(state_->net_vc_); + } + break; + + default: + LOG_ERROR("Unknown event %d", event); + } +} + +namespace { + +int handleEvents(TSCont cont, TSEvent event, void *edata) { + InterceptPlugin *plugin = static_cast<InterceptPlugin *>(TSContDataGet(cont)); + utils::internal::dispatchInterceptEvent(plugin, event, edata); + return 0; +} + +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/51fc9ff9/lib/atscppapi/src/include/atscppapi/InterceptPlugin.h ---------------------------------------------------------------------- diff --git a/lib/atscppapi/src/include/atscppapi/InterceptPlugin.h b/lib/atscppapi/src/include/atscppapi/InterceptPlugin.h new file mode 100644 index 0000000..5160c8d --- /dev/null +++ b/lib/atscppapi/src/include/atscppapi/InterceptPlugin.h @@ -0,0 +1,98 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +/** + * @file InterceptPlugin.h + */ + +#pragma once +#ifndef ATSCPPAPI_INTERCEPT_PLUGIN_H_ +#define ATSCPPAPI_INTERCEPT_PLUGIN_H_ + +#include <string> +#include <atscppapi/Transaction.h> +#include <atscppapi/TransactionPlugin.h> + +namespace atscppapi { + + + +/** + * Allows a plugin to act as a server and return the response. This + * plugin can be created in read request headers hook (pre or post + * remap). + */ +class InterceptPlugin : public TransactionPlugin { +protected: + /** + * The available types of intercepts. + */ + enum Type { + SERVER_INTERCEPT = 0, /**< Plugin will act as origin */ + TRANSACTION_INTERCEPT /**< Plugin will act as cache and origin (on cache miss) */ + }; + + /** a plugin must implement this interface, it cannot be constructed directly */ + InterceptPlugin(Transaction &transaction, Type type); + +public: + enum RequestDataType { + REQUEST_HEADER = 0, + REQUEST_BODY + }; + + /** + * A method that you must implement when writing an InterceptPlugin, this method will be + * invoked whenever client request data is read. + */ + virtual void consume(const std::string &data, RequestDataType type) = 0; + + /** + * A method that you must implement when writing an InterceptPlugin, this method + * will be invoked when the client request is deemed complete. + */ + virtual void handleInputComplete() = 0; + + virtual ~InterceptPlugin(); + + struct State; /** Internal use only */ + +protected: + /** + * This method is how an InterceptPlugin will send output back to + * the client. + */ + bool produce(const void *data, int data_size); + + bool produce(const std::string &data) { return produce(data.data(), data.size()); } + + bool setOutputComplete(); + +private: + State *state_; + + bool doRead(); + void handleEvent(int, void *); + + friend class utils::internal; +}; + +} /* atscppapi */ + + +#endif /* ATSCPPAPI_INTERCEPT_PLUGIN_H_ */
