Repository: trafficserver Updated Branches: refs/heads/fcollapse [created] d2bdb0bb0
[TS-4243] Collapsed Forwarding Plugin based on Open Write Fail Action feature. Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/d2bdb0bb Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/d2bdb0bb Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/d2bdb0bb Branch: refs/heads/fcollapse Commit: d2bdb0bb03a0835a29b5d1ef3c424238c9cfc73c Parents: 5731dbb Author: Sudheer Vinukonda <[email protected]> Authored: Mon Feb 29 18:09:42 2016 +0000 Committer: Sudheer Vinukonda <[email protected]> Committed: Mon Feb 29 18:09:42 2016 +0000 ---------------------------------------------------------------------- configure.ac | 1 + .../collapsed_forwarding/Makefile.am | 21 ++ .../experimental/collapsed_forwarding/README | 46 +++ .../collapsed_forwarding.cc | 318 +++++++++++++++++++ 4 files changed, 386 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d2bdb0bb/configure.ac ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index b9981a6..530d8c7 100644 --- a/configure.ac +++ b/configure.ac @@ -1942,6 +1942,7 @@ AC_CONFIG_FILES([ plugins/experimental/generator/Makefile plugins/experimental/geoip_acl/Makefile plugins/experimental/header_normalize/Makefile + plugins/experimental/collapsed_forwarding/Makefile plugins/experimental/hipes/Makefile plugins/experimental/inliner/Makefile plugins/experimental/memcache/Makefile http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d2bdb0bb/plugins/experimental/collapsed_forwarding/Makefile.am ---------------------------------------------------------------------- diff --git a/plugins/experimental/collapsed_forwarding/Makefile.am b/plugins/experimental/collapsed_forwarding/Makefile.am new file mode 100644 index 0000000..6e46a92 --- /dev/null +++ b/plugins/experimental/collapsed_forwarding/Makefile.am @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include $(top_srcdir)/build/plugins.mk + +pkglib_LTLIBRARIES = collapsed_forwarding.la +collapsed_forwarding_la_SOURCES = collapsed_forwarding.cc +collapsed_forwarding_la_LDFLAGS = $(TS_PLUGIN_LDFLAGS) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d2bdb0bb/plugins/experimental/collapsed_forwarding/README ---------------------------------------------------------------------- diff --git a/plugins/experimental/collapsed_forwarding/README b/plugins/experimental/collapsed_forwarding/README new file mode 100644 index 0000000..72fe545 --- /dev/null +++ b/plugins/experimental/collapsed_forwarding/README @@ -0,0 +1,46 @@ +//////////////////////////////////////////////////////////////////////////////// +// collapsed_forwarding:: +// +// ATS plugin to allow collapsed forwarding of concurrent requests for the same +// object. This plugin is based on open_write_fail_action feature, which detects +// cache open write failure on a cache miss and returns a 502 error along with a +// special @-header indicating the reason for 502 error. The plugin acts on the +// error by using an internal redirect follow back to itself, essentially blocking +// the request until a response arrives, at which point, relies on read-while-writer +// feature to start downloading the object to all waiting clients. The following +// config parameters are assumed to be set for this plugin to work: +//////////////////////////////////////////////////////////////////////////////////// +// proxy.config.http.cache.open_write_fail_action 1 ///////////////////////// +// proxy.config.cache.enable_read_while_writer 1 ///////////////////////// +// proxy.config.http.redirection_enabled 1 ///////////////////////// +// proxy.config.http.number_of_redirections 10 ///////////////////////// +// proxy.config.http.redirect_use_orig_cache_key 1 ///////////////////////// +// proxy.config.http.background_fill_active_timeout 0 ///////////////////////// +// proxy.config.http.background_fill_completed_threshold 0 ///////////////////////// +//////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////// +// This plugin currently supports only per-remap mode activation. +//////////////////////////////////////////////////////////////////////////////////// + +More details are available at + +https://docs.trafficserver.apache.org/en/6.0.x/admin/http-proxy-caching.en.html#reducing-origin-server-requests-avoiding-the-thundering-herd + +Installation: + + make + sudo make install + +If you don't have the traffic server binaries in your path, then you will need +to specify the path to tsxs manually: + + make TSXS=/opt/trafficserver/bin/tsxs + sudo make TSXS=/opt/trafficserver/bin/tsxs install + +Configuration: + + Add @plugin=cache_range_requests.so to your remap.config rules. + + Or for a global plugin where all range requests are processed, + Add cache_range_requests.so to the plugin.config + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d2bdb0bb/plugins/experimental/collapsed_forwarding/collapsed_forwarding.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/collapsed_forwarding/collapsed_forwarding.cc b/plugins/experimental/collapsed_forwarding/collapsed_forwarding.cc new file mode 100644 index 0000000..5606d6b --- /dev/null +++ b/plugins/experimental/collapsed_forwarding/collapsed_forwarding.cc @@ -0,0 +1,318 @@ +/** @file + + Plugin to perform background fetches of certain content that would + otherwise not be cached. For example, Range: requests / responses. + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +//////////////////////////////////////////////////////////////////////////////// +// collapsed_forwarding:: +// +// ATS plugin to allow collapsed forwarding of concurrent requests for the same +// object. This plugin is based on open_write_fail_action feature, which detects +// cache open write failure on a cache miss and returns a 502 error along with a +// special @-header indicating the reason for 502 error. The plugin acts on the +// error by using an internal redirect follow back to itself, essentially blocking +// the request until a response arrives, at which point, relies on read-while-writer +// feature to start downloading the object to all waiting clients. The following +// config parameters are assumed to be set for this plugin to work: +//////////////////////////////////////////////////////////////////////////////////// +// proxy.config.http.cache.open_write_fail_action 1 ///////////////////////// +// proxy.config.cache.enable_read_while_writer 1 ///////////////////////// +// proxy.config.http.redirection_enabled 1 ///////////////////////// +// proxy.config.http.number_of_redirections 10 ///////////////////////// +// proxy.config.http.redirect_use_orig_cache_key 1 ///////////////////////// +// proxy.config.http.background_fill_active_timeout 0 ///////////////////////// +// proxy.config.http.background_fill_completed_threshold 0 ///////////////////////// +//////////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////////// +// This plugin currently supports only per-remap mode activation. +//////////////////////////////////////////////////////////////////////////////////// + +#define UNUSED __attribute__((unused)) +static char UNUSED rcsId__header_normalize_cc[] = + "@(#) $Id: collapsed_forwarding.cc 218 2016-02-26 01:29:16Z sudheerv $ built on " __DATE__ " " __TIME__; + + +#include <sys/time.h> +#include <ts/ts.h> +#include <ts/remap.h> +#include <set> +#include <string> +#include <string.h> +#include <stdlib.h> +#include <stdint.h> +#include <stdio.h> +#include <stdarg.h> +#include <getopt.h> +#include <netdb.h> +#include <map> + +const char *DEBUG_TAG = (char*)"collapsed_forwarding"; + +const char *LOCATION_HEADER = "Location"; +const char *REDIRECT_REASON = "See Other"; +const char *ATS_INTERNAL_MESSAGE = "@Ats-Internal"; + +int OPEN_WRITE_FAIL_MAX_REQ_DELAY_RETRIES = 5; +int OPEN_WRITE_FAIL_REQ_DELAY_TIMEOUT = 500; + +typedef struct _RequestData { + TSHttpTxn txnp; + int wl_retry; // write lock failure retry count + std::string req_url; +} RequestData; + +int +add_redirect_header(TSMBuffer& bufp, TSMLoc& hdr_loc, const std::string& location) +{ + // This is needed in case the response already contains a Location header + TSMLoc field_loc = TSMimeHdrFieldFind(bufp, hdr_loc, LOCATION_HEADER, strlen(LOCATION_HEADER)); + + if (field_loc == TS_NULL_MLOC) { + TSMimeHdrFieldCreateNamed(bufp, hdr_loc, LOCATION_HEADER, strlen(LOCATION_HEADER), &field_loc); + } + + if (TS_SUCCESS == TSMimeHdrFieldValueStringSet(bufp, hdr_loc, field_loc, -1, location.c_str(), location.size())) { + TSDebug(DEBUG_TAG, "Adding Location header %s", LOCATION_HEADER); + TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); + } + + TSHandleMLocRelease(bufp, hdr_loc, field_loc); + + TSHttpHdrStatusSet(bufp, hdr_loc, TS_HTTP_STATUS_SEE_OTHER); + TSHttpHdrReasonSet(bufp, hdr_loc, REDIRECT_REASON, strlen(REDIRECT_REASON)); + return TS_SUCCESS; +} + +bool +check_internal_message_hdr(TSHttpTxn& txnp) +{ + TSMBuffer bufp; + TSMLoc hdr_loc; + bool found = false; + + if (TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) { + TSError("check_internal_message_hdr: couldn't retrieve client response header"); + return false; + } + + TSMLoc header_loc = TSMimeHdrFieldFind(bufp, hdr_loc, ATS_INTERNAL_MESSAGE, strlen(ATS_INTERNAL_MESSAGE)); + if (header_loc) { + found = true; + // found the header, remove it now.. + TSMimeHdrFieldDestroy(bufp, hdr_loc, header_loc); + TSHandleMLocRelease(bufp, hdr_loc, header_loc); + } + + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + + return found; +} + +int +on_OS_DNS(const RequestData* req, TSHttpTxn& txnp) +{ + if (req->wl_retry > 0) { + TSDebug (DEBUG_TAG, "OS_DNS request delayed %d times, block origin req for url: %s", req->wl_retry, req->req_url.c_str()); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + return TS_SUCCESS; + } + + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +int +on_send_request_header(const RequestData* req, TSHttpTxn& txnp) +{ + if (req->wl_retry > 0) { + TSDebug (DEBUG_TAG, "Send_Req request delayed %d times, block origin req for url: %s", req->wl_retry, req->req_url.c_str()); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + return TS_SUCCESS; + } + + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +int +on_read_response_header(TSHttpTxn& txnp) +{ + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +int +on_immediate(RequestData* req, TSCont& contp) +{ + if (!req) { + TSError ("%s: invalid req_data", DEBUG_TAG); + return TS_SUCCESS; + } + + TSDebug (DEBUG_TAG, "continuation delayed, scheduling now..for url: %s", req->req_url.c_str()); + + // add retry_done header to prevent looping + std::string value; + TSMBuffer bufp; + TSMLoc hdr_loc; + if (TSHttpTxnClientRespGet(req->txnp, &bufp, &hdr_loc) != TS_SUCCESS) { + TSError("plugin=%s, level=error, error_code=could_not_retrieve_client_response_header for url %s", DEBUG_TAG, req->req_url.c_str()); + TSHttpTxnReenable(req->txnp, TS_EVENT_HTTP_ERROR); + return TS_SUCCESS; + } + + add_redirect_header(bufp, hdr_loc, req->req_url); + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + TSHttpTxnReenable(req->txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +int +on_send_response_header(RequestData* req, TSHttpTxn& txnp, TSCont& contp) +{ + TSMBuffer bufp; + TSMLoc hdr_loc; + if (TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) { + TSError("plugin=%s, level=error, error_code=could_not_retrieve_client_response_header", DEBUG_TAG); + return TS_SUCCESS; + } + + TSHttpStatus status = TSHttpHdrStatusGet(bufp, hdr_loc); + TSDebug(DEBUG_TAG, "Response code: %d", status); + + if ((status == TS_HTTP_STATUS_BAD_GATEWAY) || (status == TS_HTTP_STATUS_SEE_OTHER)) { + bool is_internal_message_hdr = check_internal_message_hdr(txnp); + bool delay_request = is_internal_message_hdr || + ((req->wl_retry > 0) && (req->wl_retry < OPEN_WRITE_FAIL_MAX_REQ_DELAY_RETRIES)); + + if (delay_request) { + req->wl_retry++; + TSDebug (DEBUG_TAG, "delaying request, url@%p: {{%s}} on retry: %d time", txnp, req->req_url.c_str(), req->wl_retry); + TSContSchedule(contp, OPEN_WRITE_FAIL_REQ_DELAY_TIMEOUT, TS_THREAD_POOL_TASK); + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + return TS_SUCCESS; + } + } + + if (req->wl_retry > 0) { + TSDebug (DEBUG_TAG, "request delayed, but unsuccessful, url@%p: {{%s}} on retry: %d time", txnp, req->req_url.c_str(), req->wl_retry); + req->wl_retry = 0; + } + + // done..cleanup + TSfree(req); + TSContDestroy(contp); + + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +int +collapsed_cont(TSCont contp, TSEvent event, void *edata) +{ + TSHttpTxn txnp = static_cast<TSHttpTxn>(edata); + RequestData *my_req = static_cast<RequestData*>(TSContDataGet(contp)); + + switch (event) { + case TS_EVENT_HTTP_OS_DNS: + { + return on_OS_DNS(my_req, txnp); + } + + case TS_EVENT_HTTP_SEND_REQUEST_HDR: + { + return on_send_request_header(my_req, txnp); + } + + case TS_EVENT_HTTP_READ_RESPONSE_HDR: + { + return on_read_response_header(txnp); + } + case TSEvent::TS_EVENT_IMMEDIATE: + case TSEvent::TS_EVENT_TIMEOUT: + { + return on_immediate(my_req, contp); + } + case TS_EVENT_HTTP_SEND_RESPONSE_HDR: + { + return on_send_response_header(my_req, txnp, contp); + } + default: + { + TSDebug(DEBUG_TAG, "Unexpected event: %d", event); + break; + } + } + + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return TS_SUCCESS; +} + +TSReturnCode +TSRemapInit(TSRemapInterface * /* api_info */, char * /* errbuf */, int /* errbuf_size */) +{ + TSDebug(DEBUG_TAG, "plugin is succesfully initialized"); + return TS_SUCCESS; +} + +TSReturnCode +TSRemapNewInstance(int argc, char * argv[], void ** /* ih */, char * /* errbuf */, int /* errbuf_size */) +{ + // basic argv processing.. + for (int i = 2; i < argc; ++i) { + if (strncmp(argv[i], "--delay=", 8) == 0) { + OPEN_WRITE_FAIL_REQ_DELAY_TIMEOUT = atoi((char *)(argv[i] + 8)); + } else if (strncmp(argv[i], "--retries=", 10) == 0) { + OPEN_WRITE_FAIL_MAX_REQ_DELAY_RETRIES = atoi((char *)(argv[i] + 10)); + } + } + + return TS_SUCCESS; +} + +TSRemapStatus +TSRemapDoRemap(void* ih, TSHttpTxn rh, TSRemapRequestInfo *rri) +{ + TSCont cont = TSContCreate(collapsed_cont, TSMutexCreate()); + + RequestData *req_data; + req_data = static_cast<RequestData *>(TSmalloc(sizeof(RequestData))); + memset(req_data, 0, sizeof(RequestData)); + + req_data->txnp = rh; + req_data->wl_retry = 0; + + int url_len = 0; + char *url = TSHttpTxnEffectiveUrlStringGet(rh, &url_len); + req_data->req_url = std::string(url, url_len); + + TSfree(url); + TSContDataSet(cont, req_data); + + TSHttpTxnHookAdd(rh, TS_HTTP_SEND_REQUEST_HDR_HOOK, cont); + TSHttpTxnHookAdd(rh, TS_HTTP_SEND_RESPONSE_HDR_HOOK, cont); + TSHttpTxnHookAdd(rh, TS_HTTP_READ_RESPONSE_HDR_HOOK, cont); + TSHttpTxnHookAdd(rh, TS_HTTP_OS_DNS_HOOK, cont); + + return TSREMAP_DID_REMAP; +}
