Repository: trafficserver Updated Branches: refs/heads/master d6950eb32 -> 993339627
TS-3961: Open source Yahoo's ats-multiplexer plug-in This closes #302 Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/99333962 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/99333962 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/99333962 Branch: refs/heads/master Commit: 99333962777f81e0c654a685ad53603524b060ad Parents: d6950eb Author: Daniel Vitor Morilha <[email protected]> Authored: Mon Oct 26 10:52:34 2015 -0700 Committer: Bryan Call <[email protected]> Committed: Mon Oct 26 10:56:19 2015 -0700 ---------------------------------------------------------------------- NOTICE | 3 + configure.ac | 1 + plugins/experimental/Makefile.am | 1 + plugins/experimental/multiplexer/Makefile.am | 32 ++ plugins/experimental/multiplexer/README | 34 +++ .../experimental/multiplexer/ats-multiplexer.cc | 161 ++++++++++ .../experimental/multiplexer/chunk-decoder.cc | 158 ++++++++++ .../experimental/multiplexer/chunk-decoder.h | 66 ++++ plugins/experimental/multiplexer/dispatch.cc | 243 +++++++++++++++ plugins/experimental/multiplexer/dispatch.h | 70 +++++ plugins/experimental/multiplexer/fetcher.cc | 64 ++++ plugins/experimental/multiplexer/fetcher.h | 303 +++++++++++++++++++ .../multiplexer/original-request.cc | 129 ++++++++ .../experimental/multiplexer/original-request.h | 63 ++++ plugins/experimental/multiplexer/post.cc | 138 +++++++++ plugins/experimental/multiplexer/post.h | 43 +++ plugins/experimental/multiplexer/remap.config | 19 ++ plugins/experimental/multiplexer/ts.cc | 39 +++ plugins/experimental/multiplexer/ts.h | 67 ++++ 19 files changed, 1634 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index 453af9b..a90017f 100644 --- a/NOTICE +++ b/NOTICE @@ -75,6 +75,9 @@ Copyright (C) 2012 Google Inc. collapsed_collapsing: Plugin for connection collapsing to origin Copyright (C) 2014 Yahoo! Inc. All rights reserved. +multiplexer: Plugin for request multiplixing +Copyright (C) 2015 Yahoo! Inc. All rights reserved. + ~~~ healthchecks: Plugin for ATS healthchecks. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/configure.ac ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index b39e56e..1a5c7f6 100644 --- a/configure.ac +++ b/configure.ac @@ -1913,6 +1913,7 @@ AS_IF([test "x$enable_experimental_plugins" = "xyes"], [ plugins/experimental/hipes/Makefile plugins/experimental/memcached_remap/Makefile plugins/experimental/metalink/Makefile + plugins/experimental/multiplexer/Makefile plugins/experimental/mysql_remap/Makefile plugins/experimental/regex_revalidate/Makefile plugins/experimental/remap_stats/Makefile http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/Makefile.am ---------------------------------------------------------------------- diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am index 98f8293..0ec2e37 100644 --- a/plugins/experimental/Makefile.am +++ b/plugins/experimental/Makefile.am @@ -32,6 +32,7 @@ SUBDIRS = \ healthchecks \ hipes \ metalink \ + multiplexer \ memcached_remap \ regex_revalidate \ remap_stats \ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/Makefile.am ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/Makefile.am b/plugins/experimental/multiplexer/Makefile.am new file mode 100644 index 0000000..e93d45c --- /dev/null +++ b/plugins/experimental/multiplexer/Makefile.am @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include $(top_srcdir)/build/plugins.mk + +AM_CPPFLAGS += -DPLUGIN_TAG=\"multiplexer\" + +pkglib_LTLIBRARIES = multiplexer.la + +multiplexer_la_SOURCES = \ + ats-multiplexer.cc \ + chunk-decoder.cc \ + dispatch.cc \ + fetcher.cc \ + original-request.cc \ + post.cc \ + ts.cc + +multiplexer_la_LDFLAGS = $(TS_PLUGIN_LDFLAGS) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/README ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/README b/plugins/experimental/multiplexer/README new file mode 100644 index 0000000..5681928 --- /dev/null +++ b/plugins/experimental/multiplexer/README @@ -0,0 +1,34 @@ +ATS (Apache Traffic Server) Multiplexer plug-in +----------------------------------------------- + +This is a remap plug-in that allows a request to be multiplexed one or more times + and sent to different remap entries. Both headers and body (in case of POST or + PUT methods, only) are copied into the new requests. + +Multiplexer: + 1. adds "X-Multiplexer: original" header into client's request. + 2. copies client's request (bodies are copied by transforming the request) + 3. changes Host header of the copy according to pparam. + 4. changes X-Multiplexer header to "copy". + 5. sends the copied request with TSHttpConnect. + +Multiplexer dispatches the request in background without blocking the original + request. Multiplexed responses are drained and discarded. + +A global timeout can be overwritten through "multiplexer__timeout" environment variable representing how many nanoseconds to wait. A default 1s timeout is hard-coded. + +Please use "mutiplexer" tag for debugging purposes. While debugging, multiplexed requests and responses are printed into the logs. + +Multiplexer produces the following statistics consumed with traffic_line: + - failures: number of failed multiplexed requests + - hits: number of successful multiplexed requests + - requests: total number of multiplexed requests + - time(avg): average time taken between multiplexed requests and their responses + - timeouts: number of multiplexed requests which timed-out + - size(avg): average size of multiplexed responses + +Example remap.config: + map http://www.example.com/a http://www.example.com/ @plugin=multiplexer.so @pparam=host1.example.com + map http://www.example.com/b http://www.example.com/ @plugin=multiplexer.so @pparam=host2.example.com + map http://www.example.com/c http://www.example.com/ @plugin=multiplexer.so @pparam=host1.example.com @pparam=host2.example.com + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ats-multiplexer.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/ats-multiplexer.cc b/plugins/experimental/multiplexer/ats-multiplexer.cc new file mode 100644 index 0000000..adbfefe --- /dev/null +++ b/plugins/experimental/multiplexer/ats-multiplexer.cc @@ -0,0 +1,161 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <algorithm> +#include <ts/ts.h> +#include <ts/remap.h> + +#include <inttypes.h> + +#include "dispatch.h" +#include "fetcher.h" +#include "original-request.h" +#include "post.h" + +#ifndef PLUGIN_TAG +#error Please define a PLUGIN_TAG before including this file. +#endif + +// 1s +const size_t DEFAULT_TIMEOUT = 1000000000000; + +Statistics statistics; + +TSReturnCode +TSRemapInit(TSRemapInterface *, char *, int) +{ + { + timeout = 0; + const char *const timeoutEnv = getenv(PLUGIN_TAG "__timeout"); + if (timeoutEnv != NULL) { + timeout = atol(timeoutEnv); + } + if (timeout < 1) { + timeout = DEFAULT_TIMEOUT; + } + TSDebug(PLUGIN_TAG, "timeout is set to: %zu", timeout); + } + + statistics.failures = TSStatCreate(PLUGIN_TAG ".failures", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT); + + statistics.hits = TSStatCreate(PLUGIN_TAG ".hits", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT); + + statistics.time = TSStatCreate(PLUGIN_TAG ".time", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_AVG); + + statistics.requests = TSStatCreate(PLUGIN_TAG ".requests", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT); + + statistics.timeouts = TSStatCreate(PLUGIN_TAG ".timeouts", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_COUNT); + + statistics.size = TSStatCreate(PLUGIN_TAG ".size", TS_RECORDDATATYPE_INT, TS_STAT_NON_PERSISTENT, TS_STAT_SYNC_AVG); + + return TS_SUCCESS; +} + +TSReturnCode +TSRemapNewInstance(int argc, char **argv, void **i, char *, int) +{ + assert(i != NULL); + Instance *instance = new Instance; + + if (argc > 2) { + std::copy(argv + 2, argv + argc, std::back_inserter(instance->origins)); + } + + *i = static_cast<void *>(instance); + + return TS_SUCCESS; +} + +void +TSRemapDeleteInstance(void *i) +{ + assert(i != NULL); + delete static_cast<Instance *>(i); +} + +void +DoRemap(const Instance &i, TSHttpTxn t) +{ + assert(t != NULL); + /* + if (POST || PUT) { + transformRequest + } + */ + TSMBuffer buffer; + TSMLoc location; + + CHECK(TSHttpTxnClientReqGet(t, &buffer, &location)); + + assert(buffer != NULL); + assert(location != NULL); + + { + TSMLoc field; + + CHECK(TSMimeHdrFieldCreateNamed(buffer, location, "X-Multiplexer", 13, &field)); + assert(field != NULL); + + CHECK(TSMimeHdrFieldValueStringSet(buffer, location, field, -1, "original", 8)); + + CHECK(TSMimeHdrFieldAppend(buffer, location, field)); + } + + Requests requests; + generateRequests(i.origins, buffer, location, requests); + assert(requests.size() == i.origins.size()); + + int length; + const char *const method = TSHttpHdrMethodGet(buffer, location, &length); + + TSDebug(PLUGIN_TAG, "Method is %s.", std::string(method, length).c_str()); + + if (length == TS_HTTP_LEN_POST && memcmp(TS_HTTP_METHOD_POST, method, TS_HTTP_LEN_POST) == 0) { + const TSVConn vconnection = TSTransformCreate(handlePost, t); + assert(vconnection != NULL); + TSContDataSet(vconnection, new PostState(requests)); + assert(requests.empty()); + TSHttpTxnHookAdd(t, TS_HTTP_REQUEST_TRANSFORM_HOOK, vconnection); + } else { + dispatch(requests, timeout); + } + + TSHandleMLocRelease(buffer, TS_NULL_MLOC, location); + + TSStatIntIncrement(statistics.requests, 1); +} + +TSRemapStatus +TSRemapDoRemap(void *i, TSHttpTxn t, TSRemapRequestInfo *r) +{ + assert(i != NULL); + assert(t != NULL); + const Instance *const instance = static_cast<const Instance *>(i); + + if (!instance->origins.empty() && TSHttpTxnIsInternal(t) != TS_SUCCESS) { + DoRemap(*instance, t); + } else { + TSDebug(PLUGIN_TAG, "Skipping transaction %p", t); + } + + return TSREMAP_NO_REMAP; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/chunk-decoder.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/chunk-decoder.cc b/plugins/experimental/multiplexer/chunk-decoder.cc new file mode 100644 index 0000000..113cf47 --- /dev/null +++ b/plugins/experimental/multiplexer/chunk-decoder.cc @@ -0,0 +1,158 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <algorithm> +#include <assert.h> + +#include "chunk-decoder.h" + +void +ChunkDecoder::parseSizeCharacter(const char a) +{ + assert(state_ == State::kSize); + if (a >= '0' && a <= '9') { + size_ = (size_ << 4) | (a - '0'); + } else if (a >= 'A' && a <= 'F') { + size_ = (size_ << 4) | (a - 'A' + 10); + } else if (a >= 'a' && a <= 'f') { + size_ = (size_ << 4) | (a - 'a' + 10); + } else if (a == '\r') { + state_ = size_ == 0 ? State::kEndN : State::kDataN; + } else { + assert(false); // invalid input + } +} + +int +ChunkDecoder::parseSize(const char *p, const int64_t s) +{ + assert(p != NULL); + assert(s > 0); + int length = 0; + while (state_ != State::kData && *p != '\0' && length < s) { + assert(state_ < State::kUpperBound); // VALID RANGE + switch (state_) { + case State::kUnknown: + case State::kData: + case State::kInvalid: + case State::kEnd: + case State::kUpperBound: + assert(false); + break; + + case State::kDataN: + assert(*p == '\n'); + state_ = (*p == '\n') ? State::kData : State::kInvalid; + break; + + case State::kEndN: + assert(*p == '\n'); + state_ = (*p == '\n') ? State::kEnd : State::kInvalid; + return length; + + case State::kSizeR: + assert(*p == '\r'); + state_ = (*p == '\r') ? State::kSizeN : State::kInvalid; + break; + + case State::kSizeN: + assert(*p == '\n'); + state_ = (*p == '\n') ? State::kSize : State::kInvalid; + break; + + case State::kSize: + parseSizeCharacter(*p); + break; + } + ++length; + ++p; + assert(state_ != State::kInvalid); + } + return length; +} + +bool +ChunkDecoder::isSizeState(void) const +{ + return state_ == State::kDataN || state_ == State::kEndN || state_ == State::kSize || state_ == State::kSizeN || + state_ == State::kSizeR; +} + +int +ChunkDecoder::decode(const TSIOBufferReader &r) +{ + assert(r != NULL); + + if (state_ == State::kEnd) { + return 0; + } + + { + const int l = TSIOBufferReaderAvail(r); + if (l < size_) { + size_ -= l; + return l; + } + } + + int64_t size; + TSIOBufferBlock block = TSIOBufferReaderStart(r); + + if (isSizeState()) { + while (block != NULL && size_ == 0) { + const char *p = TSIOBufferBlockReadStart(block, r, &size); + assert(p != NULL); + const int i = parseSize(p, size); + size -= i; + TSIOBufferReaderConsume(r, i); + if (state_ == State::kEnd) { + assert(size_ == 0); + return 0; + } + if (isSizeState()) { + assert(size == 0); + block = TSIOBufferBlockNext(block); + } + } + } + + int length = 0; + + while (block != NULL && state_ == State::kData) { + assert(size_ > 0); + const char *p = TSIOBufferBlockReadStart(block, r, &size); + if (p != NULL) { + if (size > size_) { + length += size_; + size_ = 0; + state_ = State::kSizeR; + break; + } else { + length += size; + size_ -= size; + } + } + block = TSIOBufferBlockNext(block); + } + + return length; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/chunk-decoder.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/chunk-decoder.h b/plugins/experimental/multiplexer/chunk-decoder.h new file mode 100644 index 0000000..d81a5fe --- /dev/null +++ b/plugins/experimental/multiplexer/chunk-decoder.h @@ -0,0 +1,66 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef CHUNK_DECODER_H +#define CHUNK_DECODER_H + +#include <ts/ts.h> +#include <inttypes.h> + +class ChunkDecoder +{ + struct State { + enum STATES { + kUnknown, + + kInvalid, + + kData, + kDataN, + kEnd, + kEndN, + kSize, + kSizeN, + kSizeR, + + kUpperBound, + }; + }; + + State::STATES state_; + int64_t size_; + +public: + ChunkDecoder(void) : state_(State::kSize), size_(0) {} + void parseSizeCharacter(const char); + int parseSize(const char *, const int64_t); + int decode(const TSIOBufferReader &); + bool isSizeState(void) const; + + inline bool + isEnd(void) const + { + return state_ == State::kEnd; + } +}; + +#endif // CHUNK_DECODER_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/dispatch.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/dispatch.cc b/plugins/experimental/multiplexer/dispatch.cc new file mode 100644 index 0000000..44bb6f5 --- /dev/null +++ b/plugins/experimental/multiplexer/dispatch.cc @@ -0,0 +1,243 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <inttypes.h> + +#include "dispatch.h" +#include "fetcher.h" +#include "original-request.h" + +#ifndef PLUGIN_TAG +#error Please define a PLUGIN_TAG before including this file. +#endif + +extern Statistics statistics; + +extern size_t timeout; + +Request::Request(const std::string &h, const TSMBuffer b, const TSMLoc l) + : host(h), length(TSHttpHdrLengthGet(b, l)), io(new ats::io::IO()) +{ + assert(!host.empty()); + assert(b != NULL); + assert(l != NULL); + assert(io != NULL); + assert(length > 0); + TSHttpHdrPrint(b, l, io->buffer); + assert(length == TSIOBufferReaderAvail(io->reader)); +} + +uint64_t +copy(const TSIOBufferReader &r, const TSIOBuffer b) +{ + assert(r != NULL); + assert(b != NULL); + TSIOBufferBlock block = TSIOBufferReaderStart(r); + + uint64_t length = 0; + + for (; block; block = TSIOBufferBlockNext(block)) { + int64_t size = 0; + const void *const pointer = TSIOBufferBlockReadStart(block, r, &size); + + if (pointer != NULL && size > 0) { + const int64_t size2 = TSIOBufferWrite(b, pointer, size); + assert(size == size2); + length += size; + } + } + + return length; +} + +uint64_t +read(const TSIOBufferReader &r, std::string &o, int64_t l = 0) +{ + assert(r != NULL); + TSIOBufferBlock block = TSIOBufferReaderStart(r); + + assert(l >= 0); + if (l == 0) { + l = TSIOBufferReaderAvail(r); + assert(l >= 0); + } + + uint64_t length = 0; + + for (; block && l > 0; block = TSIOBufferBlockNext(block)) { + int64_t size = 0; + const char *const pointer = TSIOBufferBlockReadStart(block, r, &size); + if (pointer != NULL && size > 0) { + size = std::min(size, l); + o.append(pointer, size); + length += size; + l -= size; + } + } + + return length; +} + +uint64_t +read(const TSIOBuffer &b, std::string &o, const int64_t l = 0) +{ + TSIOBufferReader reader = TSIOBufferReaderAlloc(b); + const uint64_t length = read(reader, o); + TSIOBufferReaderFree(reader); + return length; +} + +class Handler +{ + int64_t length; + struct timespec start; + std::string response; + +public: + const std::string url; + + Handler(std::string u) : length(0) + { + assert(!u.empty()); + const_cast<std::string &>(url).swap(u); + clock_gettime(CLOCK_MONOTONIC, &start); + } + + void + error(void) + { + TSError("[" PLUGIN_TAG "] error when communicating with \"%s\"\n", url.c_str()); + TSStatIntIncrement(statistics.failures, 1); + } + + void + timeout(void) + { + TSError("[" PLUGIN_TAG "] timeout when communicating with \"%s\"\n", url.c_str()); + TSStatIntIncrement(statistics.timeouts, 1); + } + + void + header(const TSMBuffer b, const TSMLoc l) + { + if (TSIsDebugTagSet(PLUGIN_TAG) > 0) { + const TSIOBuffer buffer = TSIOBufferCreate(); + TSHttpHdrPrint(b, l, buffer); + std::string b; + read(buffer, b); + TSDebug(PLUGIN_TAG, "Response header for \"%s\" was:\n%s", url.c_str(), b.c_str()); + TSIOBufferDestroy(buffer); + } + } + + void + data(const TSIOBufferReader r, const int64_t l) + { + length += l; + if (TSIsDebugTagSet(PLUGIN_TAG) > 0) { + std::string buffer; + const uint64_t length = read(r, buffer, l); + response += buffer; + TSDebug(PLUGIN_TAG, "Receiving response chunk \"%s\" of %" PRIu64 " bytes", buffer.c_str(), length); + } + } + + void + done(void) + { + struct timespec end; + + clock_gettime(CLOCK_MONOTONIC, &end); + + if (TSIsDebugTagSet(PLUGIN_TAG) > 0) { + TSDebug(PLUGIN_TAG, "Response for \"%s\" was:\n%s", url.c_str(), response.c_str()); + } + + const long diff = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_nsec - start.tv_nsec) / 1000; + + TSStatIntIncrement(statistics.hits, 1); + TSStatIntIncrement(statistics.time, diff); + TSStatIntIncrement(statistics.size, length); + } +}; + +void +generateRequests(const Origins &o, const TSMBuffer buffer, const TSMLoc location, Requests &r) +{ + assert(!o.empty()); + assert(buffer != NULL); + assert(location != NULL); + + Origins::const_iterator iterator = o.begin(); + const Origins::const_iterator end = o.end(); + + OriginalRequest request(buffer, location); + request.urlScheme(""); + request.urlHost(""); + request.xMultiplexerHeader("copy"); + + for (; iterator != end; ++iterator) { + const std::string &host = *iterator; + assert(!host.empty()); + request.hostHeader(host); + r.push_back(Request(host, buffer, location)); + } +} + +void +addBody(Requests &r, const TSIOBufferReader re) +{ + assert(re != NULL); + Requests::iterator iterator = r.begin(); + const Requests::iterator end = r.end(); + const int64_t length = TSIOBufferReaderAvail(re); + if (length == 0) { + return; + } + assert(length > 0); + for (; iterator != end; ++iterator) { + assert(iterator->io != NULL); + const int64_t size = copy(re, iterator->io->buffer); + assert(size == length); + iterator->length += size; + } +} + +void +dispatch(Requests &r, const int t) +{ + Requests::iterator iterator = r.begin(); + const Requests::iterator end = r.end(); + for (; iterator != end; ++iterator) { + assert(iterator->io != NULL); + if (TSIsDebugTagSet(PLUGIN_TAG) > 0) { + TSDebug(PLUGIN_TAG, "Dispatching %i bytes to \"%s\"", iterator->length, iterator->host.c_str()); + std::string b; + read(iterator->io->reader, b); + assert(b.size() == static_cast<uint64_t>(iterator->length)); + TSDebug(PLUGIN_TAG, "%s", b.c_str()); + } + ats::get(iterator->io, iterator->length, Handler(iterator->host), t); + // forwarding iterator->io pointer ownership + iterator->io = NULL; + } +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/dispatch.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/dispatch.h b/plugins/experimental/multiplexer/dispatch.h new file mode 100644 index 0000000..e152742 --- /dev/null +++ b/plugins/experimental/multiplexer/dispatch.h @@ -0,0 +1,70 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef DISPATCH_H +#define DISPATCH_H + +#include <assert.h> +#include <string> +#include <ts/ts.h> +#include <vector> + +#include "ts.h" + +#define CHECK(X) \ + { \ + const TSReturnCode r = static_cast<TSReturnCode>(X); \ + assert(r == TS_SUCCESS); \ + } + +struct Statistics { + int failures; + int hits; + int time; // average + int requests; + int timeouts; + int size; // average +}; + +typedef std::vector<std::string> Origins; + +struct Request { + std::string host; + int length; + ats::io::IO *io; + + Request(const std::string &, const TSMBuffer, const TSMLoc); +}; + +typedef std::vector<Request> Requests; + +struct Instance { + Origins origins; +}; + +extern size_t timeout; + +void generateRequests(const Origins &, const TSMBuffer, const TSMLoc, Requests &); +void addBody(Requests &, const TSIOBufferReader); +void dispatch(Requests &, const int timeout = 0); + +#endif // DISPATCH_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/fetcher.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/fetcher.cc b/plugins/experimental/multiplexer/fetcher.cc new file mode 100644 index 0000000..7b60f1d --- /dev/null +++ b/plugins/experimental/multiplexer/fetcher.cc @@ -0,0 +1,64 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include "fetcher.h" + +namespace ats +{ +void +HttpParser::destroyParser(void) +{ + if (parser_ != NULL) { + TSHttpParserClear(parser_); + TSHttpParserDestroy(parser_); + parser_ = NULL; + } +} + +bool +HttpParser::parse(io::IO &io) +{ + if (parsed_) { + return true; + } + TSIOBufferBlock block = TSIOBufferReaderStart(io.reader); + while (block != NULL) { + int64_t size = 0; + const char *const begin = TSIOBufferBlockReadStart(block, io.reader, &size); + const char *iterator = begin; + + parsed_ = (TSHttpHdrParseResp(parser_, buffer_, location_, &iterator, iterator + size) == TS_PARSE_DONE); + TSIOBufferReaderConsume(io.reader, iterator - begin); + + if (parsed_) { + TSDebug(PLUGIN_TAG, "HttpParser: response parsing is complete (%u response status code)", statusCode()); + assert(parser_ != NULL); + destroyParser(); + return true; + } + + block = TSIOBufferBlockNext(block); + } + return false; +} + +} // end of ats namespace http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/fetcher.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/fetcher.h b/plugins/experimental/multiplexer/fetcher.h new file mode 100644 index 0000000..dcaa93b --- /dev/null +++ b/plugins/experimental/multiplexer/fetcher.h @@ -0,0 +1,303 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef NEW_FETCHER_H +#define NEW_FETCHER_H + +#include <arpa/inet.h> +#include <iostream> +#include <limits> + +#include <inttypes.h> + +#include "chunk-decoder.h" +#include "ts.h" + +#ifndef PLUGIN_TAG +#error Please define a PLUGIN_TAG before including this file. +#endif + +#define unlikely(x) __builtin_expect((x), 0) + +namespace ats +{ +struct HttpParser { + bool parsed_; + TSHttpParser parser_; + TSMBuffer buffer_; + TSMLoc location_; + + void destroyParser(void); + + ~HttpParser() + { + TSHandleMLocRelease(buffer_, TS_NULL_MLOC, location_); + TSMBufferDestroy(buffer_); + destroyParser(); + } + + HttpParser(void) : parsed_(false), parser_(TSHttpParserCreate()), buffer_(TSMBufferCreate()), location_(TSHttpHdrCreate(buffer_)) + { + TSHttpHdrTypeSet(buffer_, location_, TS_HTTP_TYPE_RESPONSE); + } + + bool parse(io::IO &); + + int + statusCode(void) const + { + return static_cast<int>(TSHttpHdrStatusGet(buffer_, location_)); + } +}; + +template <class T> struct HttpTransaction { + typedef HttpTransaction<T> Self; + + bool parsingHeaders_; + bool abort_; + bool timeout_; + io::IO *in_; + io::IO *out_; + TSVConn vconnection_; + TSCont continuation_; + T t_; + HttpParser parser_; + ChunkDecoder *chunkDecoder_; + + ~HttpTransaction() + { + if (in_ != NULL) { + delete in_; + in_ = NULL; + } + if (out_ != NULL) { + delete out_; + out_ = NULL; + } + timeout(0); + assert(vconnection_ != NULL); + if (abort_) { + TSVConnAbort(vconnection_, TS_VC_CLOSE_ABORT); + } else { + TSVConnClose(vconnection_); + } + assert(continuation_ != NULL); + TSContDestroy(continuation_); + if (chunkDecoder_ != NULL) { + delete chunkDecoder_; + } + } + + HttpTransaction(TSVConn v, TSCont c, io::IO *const i, const uint64_t l, const T &t) + : parsingHeaders_(false), abort_(false), timeout_(false), in_(NULL), out_(i), vconnection_(v), continuation_(c), t_(t), + chunkDecoder_(NULL) + { + assert(vconnection_ != NULL); + assert(continuation_ != NULL); + assert(out_ != NULL); + assert(l > 0); + out_->vio = TSVConnWrite(vconnection_, continuation_, out_->reader, l); + } + + inline void + abort(const bool b = true) + { + abort_ = b; + } + + void + timeout(const int64_t t) + { + assert(t >= 0); + assert(vconnection_ != NULL); + if (timeout_) { + TSVConnActiveTimeoutCancel(vconnection_); + timeout_ = false; + } else { + TSVConnActiveTimeoutSet(vconnection_, t); + timeout_ = true; + } + } + + static void + close(Self *const s) + { + assert(s != NULL); + TSVConnShutdown(s->vconnection_, 1, 0); + delete s; + } + + static bool + isChunkEncoding(const TSMBuffer b, const TSMLoc l) + { + assert(b != NULL); + assert(l != NULL); + bool result = false; + const TSMLoc field = TSMimeHdrFieldFind(b, l, TS_MIME_FIELD_TRANSFER_ENCODING, TS_MIME_LEN_TRANSFER_ENCODING); + if (field != NULL) { + int length; + const char *const value = TSMimeHdrFieldValueStringGet(b, l, field, -1, &length); + if (value != NULL && length == TS_HTTP_LEN_CHUNKED) { + result = strncasecmp(value, TS_HTTP_VALUE_CHUNKED, TS_HTTP_LEN_CHUNKED) == 0; + } + TSHandleMLocRelease(b, l, field); + } + return result; + } + + static int + handle(TSCont c, TSEvent e, void *data) + { + Self *const self = static_cast<Self *const>(TSContDataGet(c)); + assert(self != NULL); + switch (e) { + case TS_EVENT_ERROR: + TSDebug(PLUGIN_TAG, "HttpTransaction: ERROR"); + self->t_.error(); + self->abort(); + close(self); + TSContDataSet(c, NULL); + break; + case TS_EVENT_VCONN_EOS: + TSDebug(PLUGIN_TAG, "HttpTransaction: EOS"); + goto here; + + case TS_EVENT_VCONN_READ_COMPLETE: + TSDebug(PLUGIN_TAG, "HttpTransaction: Read Complete"); + goto here; + + case TS_EVENT_VCONN_READ_READY: + TSDebug(PLUGIN_TAG, "HttpTransaction: Read"); + here : { + assert(self->in_ != NULL); + assert(self->in_->reader != NULL); + assert(self->in_->vio != NULL); + int64_t available = TSIOBufferReaderAvail(self->in_->reader); + if (available > 0) { + TSVIONDoneSet(self->in_->vio, available + TSVIONDoneGet(self->in_->vio) + 2); + if (self->parsingHeaders_) { + if (self->parser_.parse(*self->in_)) { + if (isChunkEncoding(self->parser_.buffer_, self->parser_.location_)) { + assert(self->chunkDecoder_ == NULL); + self->chunkDecoder_ = new ChunkDecoder(); + } + self->t_.header(self->parser_.buffer_, self->parser_.location_); + self->parsingHeaders_ = false; + } + } + if (!self->parsingHeaders_) { + if (self->chunkDecoder_ != NULL) { + available = self->chunkDecoder_->decode(self->in_->reader); + do { + self->t_.data(self->in_->reader, available); + TSIOBufferReaderConsume(self->in_->reader, available); + available = self->chunkDecoder_->decode(self->in_->reader); + } while (available > 0); + } else { + self->t_.data(self->in_->reader, available); + TSIOBufferReaderConsume(self->in_->reader, available); + } + } + } + if (e == TS_EVENT_VCONN_READ_COMPLETE || e == TS_EVENT_VCONN_EOS) { + self->t_.done(); + close(self); + TSContDataSet(c, NULL); + } else if (self->chunkDecoder_ != NULL && self->chunkDecoder_->isEnd()) { + assert(self->parsingHeaders_ == false); + assert(isChunkEncoding(self->parser_.buffer_, self->parser_.location_)); + self->abort(); + self->t_.done(); + close(self); + TSContDataSet(c, NULL); + } else { + TSVIOReenable(self->in_->vio); + } + } break; + case TS_EVENT_VCONN_WRITE_COMPLETE: + TSDebug(PLUGIN_TAG, "HttpTransaction: Write Complete"); + self->parsingHeaders_ = true; + assert(self->in_ == NULL); + self->in_ = io::IO::read(self->vconnection_, c); + assert(self->vconnection_); + TSVConnShutdown(self->vconnection_, 0, 1); + assert(self->out_ != NULL); + delete self->out_; + self->out_ = NULL; + break; + case TS_EVENT_VCONN_WRITE_READY: + TSDebug(PLUGIN_TAG, "HttpTransaction: Write Ready (Done: %" PRId64 " Todo: %" PRId64 ")", TSVIONDoneGet(self->out_->vio), + TSVIONTodoGet(self->out_->vio)); + assert(self->out_ != NULL); + TSVIOReenable(self->out_->vio); + break; + case 106: + case TS_EVENT_TIMEOUT: + case TS_EVENT_VCONN_INACTIVITY_TIMEOUT: + TSDebug(PLUGIN_TAG, "HttpTransaction: Timeout"); + self->t_.timeout(); + self->abort(); + close(self); + TSContDataSet(c, NULL); + break; + + default: + assert(false); // UNRECHEABLE. + } + return 0; + } +}; + +template <class T> +bool +get(const std::string &a, io::IO *const i, const int64_t l, const T &t, const int64_t ti = 0) +{ + typedef HttpTransaction<T> Transaction; + struct sockaddr_in socket; + socket.sin_family = AF_INET; + socket.sin_port = 80; + if (!inet_pton(AF_INET, a.c_str(), &socket.sin_addr)) { + TSDebug(PLUGIN_TAG, "ats::get Invalid address provided \"%s\".", a.c_str()); + return false; + } + TSVConn vconn = TSHttpConnect(reinterpret_cast<sockaddr *>(&socket)); + assert(vconn != NULL); + TSCont contp = TSContCreate(Transaction::handle, NULL); + assert(contp != NULL); + Transaction *transaction = new Transaction(vconn, contp, i, l, t); + TSContDataSet(contp, transaction); + if (ti > 0) { + TSDebug(PLUGIN_TAG, "ats::get Setting active timeout to: %zu", ti); + transaction->timeout(ti); + } + return true; +} + +template <class T> +bool +get(io::IO *const i, const int64_t l, const T &t, const int64_t ti = 0) +{ + return get("127.0.0.1", i, l, t, ti); +} +} // end of ats namespace + +#endif // NEW_FETCHER_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/original-request.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/original-request.cc b/plugins/experimental/multiplexer/original-request.cc new file mode 100644 index 0000000..3ea76f9 --- /dev/null +++ b/plugins/experimental/multiplexer/original-request.cc @@ -0,0 +1,129 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <cstring> + +#include "dispatch.h" +#include "original-request.h" + +template <class T> +std::string +get(const TSMBuffer &b, const TSMLoc &l, const T &t) +{ + int length = 0; + const char *const buffer = t(b, l, &length); + + assert(buffer != NULL); + assert(length > 0); + assert(strlen(buffer) >= static_cast<unsigned int>(length)); + + return std::string(buffer, length); +} + +std::string +get(const TSMBuffer &b, const TSMLoc &l, const TSMLoc &f, const int i = 0) +{ + int length = 0; + const char *const buffer = TSMimeHdrFieldValueStringGet(b, l, f, i, &length); + + assert(buffer != NULL); + assert(length > 0); + assert(strlen(buffer) >= static_cast<unsigned int>(length)); + + return std::string(buffer, length); +} + +OriginalRequest::OriginalRequest(const TSMBuffer b, const TSMLoc l) : buffer_(b), location_(l) +{ + CHECK(TSHttpHdrUrlGet(b, l, &url_)); + + assert(url_ != NULL); + + const_cast<std::string &>(original.urlScheme) = get(buffer_, url_, TSUrlSchemeGet); + const_cast<std::string &>(original.urlHost) = get(buffer_, url_, TSUrlHostGet); + // TODO(dmorilha): handle port + + /* + * this code assumes the request has a single Host header + */ + hostHeader_ = TSMimeHdrFieldFind(b, l, TS_MIME_FIELD_HOST, TS_MIME_LEN_HOST); + assert(hostHeader_ != NULL); + + const_cast<std::string &>(original.hostHeader) = get(buffer_, location_, hostHeader_); + + xMultiplexerHeader_ = TSMimeHdrFieldFind(b, l, "X-Multiplexer", 13); + + if (xMultiplexerHeader_ != NULL) { + const_cast<std::string &>(original.xMultiplexerHeader) = get(buffer_, location_, xMultiplexerHeader_); + } +} + +OriginalRequest::~OriginalRequest(void) +{ + urlScheme(original.urlScheme); + urlHost(original.urlHost); + hostHeader(original.hostHeader); + if (!original.xMultiplexerHeader.empty()) { + xMultiplexerHeader(original.xMultiplexerHeader); + } + + TSHandleMLocRelease(buffer_, location_, hostHeader_); + TSHandleMLocRelease(buffer_, location_, url_); +} + +void +OriginalRequest::urlScheme(const std::string &s) +{ + assert(buffer_ != NULL); + assert(url_ != NULL); + const TSReturnCode result = TSUrlSchemeSet(buffer_, url_, s.c_str(), s.size()); + assert(result == TS_SUCCESS); +} + +void +OriginalRequest::urlHost(const std::string &s) +{ + assert(buffer_ != NULL); + assert(url_ != NULL); + CHECK(TSUrlHostSet(buffer_, url_, s.c_str(), s.size())); +} + +void +OriginalRequest::hostHeader(const std::string &s) +{ + assert(buffer_ != NULL); + assert(location_ != NULL); + assert(hostHeader_ != NULL); + CHECK(TSMimeHdrFieldValueStringSet(buffer_, location_, hostHeader_, 0, s.c_str(), s.size())); +} + +bool +OriginalRequest::xMultiplexerHeader(const std::string &s) +{ + assert(buffer_ != NULL); + assert(location_ != NULL); + if (xMultiplexerHeader_ == NULL) { + return false; + } + CHECK(TSMimeHdrFieldValueStringSet(buffer_, location_, xMultiplexerHeader_, 0, s.c_str(), s.size())); + return true; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/original-request.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/original-request.h b/plugins/experimental/multiplexer/original-request.h new file mode 100644 index 0000000..3814267 --- /dev/null +++ b/plugins/experimental/multiplexer/original-request.h @@ -0,0 +1,63 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef ORIGINAL_REQUEST_H +#define ORIGINAL_REQUEST_H + +#include <assert.h> +#include <string> +#include <ts/ts.h> + +/* + * on dispatch we get one parsed request. + * So we want to alter and modify it back the way it was originally. + */ +class OriginalRequest +{ + TSMBuffer buffer_; + TSMLoc location_; + TSMLoc url_; + TSMLoc hostHeader_; + TSMLoc xMultiplexerHeader_; + + OriginalRequest(const OriginalRequest &); + OriginalRequest &operator=(const OriginalRequest &); + +public: + struct { + const std::string hostHeader; + const std::string urlHost; + const std::string urlScheme; + const std::string xMultiplexerHeader; + } original; + + ~OriginalRequest(); + + OriginalRequest(const TSMBuffer, const TSMLoc); + + void urlScheme(const std::string &); + void urlHost(const std::string &); + void hostHeader(const std::string &); + bool xMultiplexerHeader(const std::string &); +}; + +#endif // ORIGINAL_REQUEST_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/post.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/post.cc b/plugins/experimental/multiplexer/post.cc new file mode 100644 index 0000000..87701d4 --- /dev/null +++ b/plugins/experimental/multiplexer/post.cc @@ -0,0 +1,138 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include <assert.h> +#include <limits> + +#include "post.h" + +#ifndef PLUGIN_TAG +#error Please define a PLUGIN_TAG before including this file. +#endif + +PostState::~PostState() +{ + if (buffer != NULL) { + TSIOBufferDestroy(buffer); + buffer = NULL; + } +} + +PostState::PostState(Requests &r) : buffer(NULL), reader(NULL), vio(NULL) +{ + assert(!r.empty()); + requests.swap(r); +} + +static void +postTransform(const TSCont c, PostState &s) +{ + assert(c != NULL); + + const TSVConn vconnection = TSTransformOutputVConnGet(c); + assert(vconnection != NULL); + + const TSVIO vio = TSVConnWriteVIOGet(c); + assert(vio != NULL); + + if (!s.buffer) { + s.buffer = TSIOBufferCreate(); + assert(s.buffer != NULL); + + const TSIOBufferReader reader = TSIOBufferReaderAlloc(s.buffer); + assert(reader != NULL); + + s.reader = TSIOBufferReaderClone(reader); + assert(s.reader != NULL); + + s.vio = TSVConnWrite(vconnection, c, reader, std::numeric_limits<int64_t>::max()); + assert(s.vio != NULL); + } + + if (!TSVIOBufferGet(vio)) { + TSVIONBytesSet(s.vio, TSVIONDoneGet(vio)); + TSVIOReenable(s.vio); + return; + } + + int64_t toWrite = TSVIONTodoGet(vio); + assert(toWrite >= 0); + + if (toWrite > 0) { + toWrite = std::min(toWrite, TSIOBufferReaderAvail(TSVIOReaderGet(vio))); + assert(toWrite >= 0); + + if (toWrite > 0) { + TSIOBufferCopy(TSVIOBufferGet(s.vio), TSVIOReaderGet(vio), toWrite, 0); + TSIOBufferReaderConsume(TSVIOReaderGet(vio), toWrite); + TSVIONDoneSet(vio, TSVIONDoneGet(vio) + toWrite); + } + } + + if (TSVIONTodoGet(vio) > 0) { + if (toWrite > 0) { + TSVIOReenable(s.vio); + CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_VCONN_WRITE_READY, vio)); + } + } else { + TSVIONBytesSet(s.vio, TSVIONDoneGet(vio)); + TSVIOReenable(s.vio); + CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_VCONN_WRITE_COMPLETE, vio)); + } +} + +int +handlePost(TSCont c, TSEvent e, void *data) +{ + assert(c != NULL); + // TODO(dmorilha): assert on possible events. + PostState *const state = static_cast<PostState *>(TSContDataGet(c)); + assert(state != NULL); + if (TSVConnClosedGet(c)) { + assert(data != NULL); + if (state->reader != NULL) { + addBody(state->requests, state->reader); + } + dispatch(state->requests, timeout); + delete state; + TSContDataSet(c, NULL); + TSContDestroy(c); + return 0; + } else { + switch (e) { + case TS_EVENT_ERROR: { + const TSVIO vio = TSVConnWriteVIOGet(c); + assert(vio != NULL); + CHECK(TSContCall(TSVIOContGet(vio), TS_EVENT_ERROR, vio)); + } break; + case TS_EVENT_VCONN_WRITE_COMPLETE: + TSVConnShutdown(TSTransformOutputVConnGet(c), 0, 1); + break; + + case TS_EVENT_VCONN_WRITE_READY: + default: + postTransform(c, *state); + break; + } + } + return 0; +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/post.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/post.h b/plugins/experimental/multiplexer/post.h new file mode 100644 index 0000000..dfc7be5 --- /dev/null +++ b/plugins/experimental/multiplexer/post.h @@ -0,0 +1,43 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef POST_H +#define POST_H + +#include <ts/ts.h> + +#include "dispatch.h" + +struct PostState { + Requests requests; + + TSIOBuffer buffer; + TSIOBufferReader reader; + TSVIO vio; + + ~PostState(); + PostState(Requests &); +}; + +int handlePost(TSCont, TSEvent, void *); + +#endif // POST_H http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/remap.config ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/remap.config b/plugins/experimental/multiplexer/remap.config new file mode 100644 index 0000000..7e453ad --- /dev/null +++ b/plugins/experimental/multiplexer/remap.config @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +map http://localhost http://localhost:8181 @plugin=ats-multiplexer.so @pparam=host1 @pparam=host2 +map http://host1 http://localhost:8181 +map http://host2 http://localhost:8181 http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ts.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/ts.cc b/plugins/experimental/multiplexer/ts.cc new file mode 100644 index 0000000..0a53cc9 --- /dev/null +++ b/plugins/experimental/multiplexer/ts.cc @@ -0,0 +1,39 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#include "ts.h" + +namespace ats +{ +namespace io +{ + IO * + IO::read(TSVConn v, TSCont c, const int64_t s) + { + assert(s > 0); + IO *io = new IO(); + io->vio = TSVConnRead(v, c, io->buffer, s); + return io; + } + +} // end of io namespace +} // end of ats namespace http://git-wip-us.apache.org/repos/asf/trafficserver/blob/99333962/plugins/experimental/multiplexer/ts.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/multiplexer/ts.h b/plugins/experimental/multiplexer/ts.h new file mode 100644 index 0000000..d5dfdf6 --- /dev/null +++ b/plugins/experimental/multiplexer/ts.h @@ -0,0 +1,67 @@ +/** @file + + Multiplexes request to other origins. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ +#ifndef TS_H +#define TS_H +#include <assert.h> +#include <cstring> +#include <limits> +#include <list> +#include <memory> +#include <string> +#include <ts/ts.h> + +namespace ats +{ +namespace io +{ + struct IO { + TSIOBuffer buffer; + TSIOBufferReader reader; + TSVIO vio; + + ~IO() + { + assert(buffer != NULL); + assert(reader != NULL); + const int64_t available = TSIOBufferReaderAvail(reader); + if (available > 0) { + TSIOBufferReaderConsume(reader, available); + } + TSIOBufferReaderFree(reader); + TSIOBufferDestroy(buffer); + } + + IO(void) : buffer(TSIOBufferCreate()), reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) {} + IO(const TSIOBuffer &b) : buffer(b), reader(TSIOBufferReaderAlloc(buffer)), vio(NULL) { assert(buffer != NULL); } + static IO *read(TSVConn, TSCont, const int64_t); + + static IO * + read(TSVConn v, TSCont c) + { + return IO::read(v, c, std::numeric_limits<int64_t>::max()); + } + }; + +} // end of io namespace +} // end of ats namespace +#endif // TS_H
