Updated Branches: refs/heads/master c55e396bb -> 0c7c43dca
TS-1596 - Added channel_stats plugin to experimental. Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/de35ad0a Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/de35ad0a Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/de35ad0a Branch: refs/heads/master Commit: de35ad0abdf579a52ea765eaceba898dcc1e0b73 Parents: c55e396 Author: Phil Sorber <[email protected]> Authored: Sat Dec 1 16:13:40 2012 -0500 Committer: Phil Sorber <[email protected]> Committed: Sat Dec 1 16:53:46 2012 -0500 ---------------------------------------------------------------------- CHANGES | 2 + configure.ac | 1 + plugins/experimental/Makefile.am | 3 +- plugins/experimental/channel_stats/Makefile.am | 18 + plugins/experimental/channel_stats/Makefile.tsxs | 24 + plugins/experimental/channel_stats/README | 90 ++ .../experimental/channel_stats/channel_stats.cc | 806 +++++++++++++++ plugins/experimental/channel_stats/debug_macros.h | 77 ++ 8 files changed, 1020 insertions(+), 1 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 0a8a6ca..2af471c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,8 @@ -*- coding: utf-8 -*- Changes with Apache Traffic Server 3.3.1 + *) [TS-1596] Added channel_stats plugin to experimental. + *) [TS-1607] decouple SSL certificate lookup *) [TS-1506] %<cquuh> log symbol will crash TS when requesting a SSL url. http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/configure.ac ---------------------------------------------------------------------- diff --git a/configure.ac b/configure.ac index 29dff32..a5455d5 100644 --- a/configure.ac +++ b/configure.ac @@ -1614,6 +1614,7 @@ AC_CONFIG_FILES([plugins/experimental/header_rewrite/Makefile]) AC_CONFIG_FILES([plugins/experimental/metalink/Makefile]) AC_CONFIG_FILES([plugins/experimental/gzip/Makefile]) AC_CONFIG_FILES([plugins/experimental/spdy/Makefile]) +AC_CONFIG_FILES([plugins/experimental/channel_stats/Makefile]) # various tools AC_CONFIG_FILES([tools/Makefile]) # example plugins http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/Makefile.am ---------------------------------------------------------------------- diff --git a/plugins/experimental/Makefile.am b/plugins/experimental/Makefile.am index 025ebce..fae28a7 100644 --- a/plugins/experimental/Makefile.am +++ b/plugins/experimental/Makefile.am @@ -24,5 +24,6 @@ SUBDIRS = \ header_rewrite \ metalink \ gzip \ - spdy + spdy \ + channel_stats endif http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.am ---------------------------------------------------------------------- diff --git a/plugins/experimental/channel_stats/Makefile.am b/plugins/experimental/channel_stats/Makefile.am new file mode 100644 index 0000000..e2e6872 --- /dev/null +++ b/plugins/experimental/channel_stats/Makefile.am @@ -0,0 +1,18 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +AM_CPPFLAGS = -I$(top_builddir)/proxy/api -I$(top_srcdir)/proxy/api + +pkglibdir = ${pkglibexecdir} +pkglib_LTLIBRARIES = channel_stats.la +channel_stats_la_SOURCES = channel_stats.cc +channel_stats_la_LDFLAGS = -module -avoid-version -shared http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/Makefile.tsxs ---------------------------------------------------------------------- diff --git a/plugins/experimental/channel_stats/Makefile.tsxs b/plugins/experimental/channel_stats/Makefile.tsxs new file mode 100644 index 0000000..aa84569 --- /dev/null +++ b/plugins/experimental/channel_stats/Makefile.tsxs @@ -0,0 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +TSXS?=tsxs + +%.so: %.cc + $(TSXS) -C $< -o $@ + +all: channel_stats.so + +install: all + $(TSXS) -i -o channel_stats.so + +clean: + rm -f *.lo *.so http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/README ---------------------------------------------------------------------- diff --git a/plugins/experimental/channel_stats/README b/plugins/experimental/channel_stats/README new file mode 100644 index 0000000..873e151 --- /dev/null +++ b/plugins/experimental/channel_stats/README @@ -0,0 +1,90 @@ +channel_stats plugin for Apache Traffic Server 3.0.0+ + +About channel_stats plugin +========================== +channel_stats plugin collect the runtime statistics (speed, request count and +more in the future) for each channel. The stats is exposed with http interface +in json. (The code of interface is from 'stats_over_http' plugin.) + +In general, the plugin could be used on reverse proxy with a fixed number of +remap rules. It's not design for the proxy which serves unlimited channels ( +open-relay forward proxy). + + +Installation +========================== + make -f Makefile.tsxs + sudo make install -f Makefile.tsxs +(if 'tsxs' is not in your PATH, run make by appending TSXS=/path/to/ts/bin/tsxs) + +Add 'channel_stats.so' to plugins.config. By default, the path of http interface +is '_cstats'. For safety, you should change it by adding a parameter after +'channel_stats.so'. Example: 'channel_stats.so _my_cstats'. + +Restart Traffic Server: sudo traffic_line -L or sudo trafficserver restart + + +Viewing Stats +========================== +Visit http://local_IP:port/_cstats (or as you configured in plugin.config) +Output will be in json: + +{ + "channel": { + "www.example.com": { + "response.bytes.content": "3486995502046", + "response.count.2xx.get": "64040675", + "speed.ua.bytes_per_sec_64k": "3972287" + }, + "www.test.com": { + "response.bytes.content": "3349404916760", + "response.count.2xx.get": "64038172", + "speed.ua.bytes_per_sec_64k": "3989255" + } + }, + "global": { + "response.count.2xx.get": "268516715", + "server": "3.0.4" + } +} + +Available stats: + - response.bytes.content: transferred content length (not including header) + - response.count.2xx.get: transaction count + - speed.ua.bytes_per_sec_64k: count of transaction whose speed is < 64KBps + +Additional parameters: + - topn: only output top N channels order by response count + - channel: only output the channels which contain specific string + - global: also display TS internal stats as 'stats_over_http' plugin + Example: + - http://127.0.0.1/_cstats?global + - http://127.0.0.1/_cstats?topn=5 + - http://127.0.0.1/_cstats?channel=test.com + - http://127.0.0.1/_cstats?channel=test.com&topn=5&global +If you have a large number of channels (e.g. more than 10k), those parameters +may not be heavily used due to extra overhead. + + +Warning +========================== +Security + - As mentioned above, you should change default path of http interface to make + other people harder to access your channel stats. + - For IPv4, plugin will make sure visitor is from private network. + (http://en.wikipedia.org/wiki/Private_network#Private_IPv4_address_spaces) + In addition, you should deny the request to http interface on your L7 front- + end because your L7 server and TS are probably in a same local network. And + currently plugin doesn't check IPv6 address. (should disable IPv6 by default?) + - The number of channels is limited to 100000. + + +DEV +========================== + + +ChangeLog +========================== + +Version 0.1 (11/26/12) + - Initial release http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/channel_stats.cc ---------------------------------------------------------------------- diff --git a/plugins/experimental/channel_stats/channel_stats.cc b/plugins/experimental/channel_stats/channel_stats.cc new file mode 100644 index 0000000..7cca4e7 --- /dev/null +++ b/plugins/experimental/channel_stats/channel_stats.cc @@ -0,0 +1,806 @@ +/* + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// get INT64_MAX +#define __STDC_LIMIT_MACROS + +#include <cstdio> +#include <cstring> +#include <cctype> +#include <string> +#include <map> // may optimize by using hash_map, but mind compiler portability +#include <vector> +#include <algorithm> +#include <sstream> +#include <arpa/inet.h> + +#include <ts/ts.h> +#if (TS_VERSION_NUMBER < 3003001) +// get TSHttpTxnStartTimeGet +#include <ts/experimental.h> +#endif + +#include "debug_macros.h" + +#define PLUGIN_NAME "channel_stats" +#define PLUGIN_VERSION "0.1" + +#define MAX_SPEED 999999999 + +/* limit the number of channels (items) to avoid potential attack, + regex_map rule can also generate infinite channels (hosts) */ +#define MAX_MAP_SIZE 100000 + +static std::string api_path("_cstats"); + +// global stats +static uint64_t global_response_count_2xx_get = 0; // 2XX GET response count +static uint64_t global_response_bytes_content = 0; // transferred bytes (2xx.get) + +// channel stats +struct channel_stat { + channel_stat() + : response_bytes_content(0), + response_count_2xx(0), + speed_ua_bytes_per_sec_64k(0) { + } + + inline void increment(uint64_t rbc, uint64_t rc2, uint64_t sbps6) { + __sync_fetch_and_add(&response_bytes_content, rbc); + if (rc2) __sync_fetch_and_add(&response_count_2xx, rc2); + if (sbps6) __sync_fetch_and_add(&speed_ua_bytes_per_sec_64k, sbps6); + } + + inline void debug_channel() { + debug("response.bytes.content: %" PRIu64 "", response_bytes_content); + debug("response.count.2xx: %" PRIu64 "", response_count_2xx); + debug("speed.ua.bytes_per_sec_64k: %" PRIu64 "", speed_ua_bytes_per_sec_64k); + } + + uint64_t response_bytes_content; + uint64_t response_count_2xx; + uint64_t speed_ua_bytes_per_sec_64k; +}; + +typedef std::map<std::string, channel_stat *> stats_map_type; +typedef stats_map_type::iterator smap_iterator; + +static stats_map_type channel_stats; +static TSMutex stats_map_mutex; + +// api Intercept Data +typedef struct intercept_state_t +{ + TSVConn net_vc; + TSVIO read_vio; + TSVIO write_vio; + + TSIOBuffer req_buffer; + TSIOBuffer resp_buffer; + TSIOBufferReader resp_reader; + + int output_bytes; + int body_written; + + int show_global; // default 0 + char * channel; // default "" + int topn; // default -1 + int deny; // default 0 +} intercept_state; + +struct private_seg_t { + struct in_addr net; + struct in_addr mask; +}; + +// don't put inet_addr("255.255.255.255"), see BUGS in 'man 3 inet_addr' +static struct private_seg_t private_segs[] = { + {{inet_addr("10.0.0.0")}, {inet_addr("255.0.0.0")}}, + {{inet_addr("127.0.0.0")}, {inet_addr("255.0.0.0")}}, + {{inet_addr("172.16.0.0")}, {inet_addr("255.240.0.0")}}, + {{inet_addr("192.168.0.0")}, {inet_addr("255.255.0.0")}} +}; +static int num_private_segs = sizeof(private_segs) / sizeof(private_seg_t); + +// all parameters are in network byte order +static int +is_in_net (const struct in_addr * addr, + const struct in_addr * netaddr, + const struct in_addr * netmask) +{ + if ((addr->s_addr & netmask->s_addr) == (netaddr->s_addr & netmask->s_addr)) + return 1; + return 0; +} + +static int +is_private_ip(const struct in_addr * addr) +{ + int i; + for (i = 0; i < num_private_segs; i++) { + if (is_in_net(addr, &private_segs[i].net, &private_segs[i].mask)) + return 1; + } + return 0; +} + +static int handle_event(TSCont contp, TSEvent event, void *edata); +static int api_handle_event(TSCont contp, TSEvent event, void *edata); + +/* + Get the value of parameter in url querystring + Return 0 and a null string if not find the parameter. + Return 1 and a value string, normally + Return 2 and a max_length value string if the length of the value exceeds. + + Possible appearance: ?param=value&fake_param=value¶m=value +*/ +static int +get_query_param(const char *query, const char *param, + char *result, int max_length) +{ + char *pos = 0; + + pos = strstr(query, param); // try to find in querystring of url + if (pos != query) { + // if param is not prefix of querystring + while (pos && *(pos - 1) != '&') { // param must be after '&' + pos = strstr(pos + strlen(param), param); // try next + } + } + + if (!pos) { + // set it null string if not found + result[0] = '\0'; + return 0; + } + + pos += strlen(param); // skip 'param=' + + // copy value of param + int now = 0; + while (*pos != '\0' && *pos != '&' && now < max_length) { + result[now++] = *pos; + pos++; + } + result[now] = '\0'; // make sure null-terminated + + if (*pos != '\0' && *pos != '&' && now == max_length) + return 2; + else + return 1; +} + +/* + check if exist param in query string + + Possible querystring: ?param1=value1¶m2 + (param2 is a param which "has_no_value") +*/ +static int +has_query_param(const char *query, const char *param, int has_no_value) +{ + char *pos = 0; + + pos = strstr(query, param); // try to find in querystring of url + if (pos != query) { + // if param is not prefix of querystring + while (pos && *(pos - 1) != '&') { // param must be after '&' + pos = strstr(pos + strlen(param), param); // try next + } + } + + if (!pos) + return 0; + + pos += strlen(param); // skip 'param=' + + if (has_no_value) { + if (*pos == '\0' || *pos == '&') return 1; + } else { + if (*pos == '=') return 1; + } + + return 0; +} + +static void +get_api_params(TSMBuffer bufp, + TSMLoc url_loc, + int * show_global, + char ** channel, + int * topn) +{ + const char * query; // not null-terminated, get from TS api + char * tmp_query = NULL; // null-terminated + int query_len = 0; + + *show_global = 0; + *topn = -1; + + query = TSUrlHttpQueryGet(bufp, url_loc, &query_len); + if (query_len == 0) + return; + tmp_query = TSstrndup(query, query_len); + debug_api("querystring: %s", tmp_query); + + if (has_query_param(tmp_query, "global", 1)) { + debug_api("found 'global' param"); + *show_global = 1; + } + + *channel = (char *) TSmalloc(query_len); + if (get_query_param(tmp_query, "channel=", *channel, query_len)) { + debug_api("found 'channel' param: %s", *channel); + } + + std::stringstream ss; + char * tmp_topn = (char *) TSmalloc(query_len); + if (get_query_param(tmp_query, "topn=", tmp_topn, 10)) { + if (strlen(tmp_topn) > 0) { + ss.str(tmp_topn); + ss >> *topn; + } + debug_api("found 'topn' param: %d", *topn); + } + + TSfree(tmp_query); + TSfree(tmp_topn); +} + +static void +handle_read_req(TSCont contp, TSHttpTxn txnp) +{ + TSMBuffer bufp; + TSMLoc hdr_loc = NULL; + TSMLoc url_loc = NULL; + const char *method; + int method_length = 0; + TSCont txn_contp; + + const char * path; + int path_len; + struct sockaddr * client_addr; + struct sockaddr_in * client_addr4; + TSCont api_contp; + char * client_ip; + intercept_state *api_state; + + if (TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) { + error("couldn't retrieve client's request"); + goto cleanup; + } + + method = TSHttpHdrMethodGet(bufp, hdr_loc, &method_length); + if (0 != strncmp(method, TS_HTTP_METHOD_GET, method_length)) { + debug("do not count %.*s method", method_length, method); + goto cleanup; + } + + if (TSHttpHdrUrlGet(bufp, hdr_loc, &url_loc) != TS_SUCCESS) + goto cleanup; + + path = TSUrlPathGet(bufp, url_loc, &path_len); + if (path_len == 0 || (unsigned)path_len != api_path.length() || + strncmp(api_path.c_str(), path, path_len) != 0) { + goto not_api; + } + + // register our intercept + debug_api("Intercepting request"); + api_state = (intercept_state *) TSmalloc(sizeof(*api_state)); + memset(api_state, 0, sizeof(*api_state)); + get_api_params(bufp, url_loc, + &api_state->show_global, &api_state->channel, + &api_state->topn); + + // check private ip + client_addr = (struct sockaddr *) TSHttpTxnClientAddrGet(txnp); + if (client_addr->sa_family == AF_INET) { + client_addr4 = (struct sockaddr_in *) client_addr; + if (!is_private_ip(&client_addr4->sin_addr)) { + client_ip = (char *) TSmalloc(INET_ADDRSTRLEN); + inet_ntop(AF_INET, &client_addr4->sin_addr, client_ip, INET_ADDRSTRLEN); + debug_api("%s is not a private IP, request denied", client_ip); + api_state->deny = 1; + TSfree(client_ip); + } + } else { + debug_api("not IPv4, ignore IP auth"); // TODO check AF_INET6 private IP? + } + + TSSkipRemappingSet(txnp, 1); //not strictly necessary + + api_contp = TSContCreate(api_handle_event, TSMutexCreate()); + TSContDataSet(api_contp, api_state); + TSHttpTxnIntercept(api_contp, txnp); + + goto cleanup; + +not_api: + + txn_contp = TSContCreate(handle_event, NULL); // reuse global hander + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp); + +cleanup: + if (url_loc) TSHandleMLocRelease(bufp, hdr_loc, url_loc); + if (hdr_loc) TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); +} + +static void +handle_txn_close(TSCont contp, TSHttpTxn txnp) +{ + TSMBuffer bufp; + TSMLoc hdr_loc; + TSHttpStatus status; + TSMLoc purl_loc; + const char * pristine_host; + int pristine_host_len = 0; + int pristine_port; + uint64_t user_speed; + uint64_t body_bytes; + TSHRTime start_time = 0; + TSHRTime end_time = 0; + TSHRTime interval_time = 0; + smap_iterator stat_it; + channel_stat *stat; + std::pair<smap_iterator, bool> insert_ret; + std::string host; + std::stringstream ss; + + if (TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc) != TS_SUCCESS) { + debug("couldn't retrieve final response"); + return; + } + + status = TSHttpHdrStatusGet(bufp, hdr_loc); + if (status != TS_HTTP_STATUS_OK && status != TS_HTTP_STATUS_PARTIAL_CONTENT) { + debug("only count 200/206 response"); + goto cleanup; + } + + if (TSHttpTxnPristineUrlGet(txnp, &bufp, &purl_loc) != TS_SUCCESS) { + debug("couldn't retrieve pristine url"); + goto cleanup; + } + + pristine_host = TSUrlHostGet(bufp, purl_loc, &pristine_host_len); + if (pristine_host_len == 0) { + debug("couldn't retrieve pristine host"); + goto cleanup; + } + pristine_port = TSUrlPortGet(bufp, purl_loc); + host = std::string(pristine_host, pristine_host_len); + if (pristine_port != 80) { + ss << pristine_port; + host += ":" + ss.str(); + } + + body_bytes = TSHttpTxnClientRespBodyBytesGet(txnp); + __sync_fetch_and_add(&global_response_count_2xx_get, 1); + __sync_fetch_and_add(&global_response_bytes_content, body_bytes); + + debug("pristine host: %.*s", pristine_host_len, pristine_host); + debug("pristine port: %d", pristine_port); + debug("host to lookup: %s", host.c_str()); + debug("body bytes: %" PRIu64 "", body_bytes); + debug("2xx req count: %" PRIu64 "", global_response_count_2xx_get); + +#if (TS_VERSION_NUMBER < 3003001) + TSHttpTxnStartTimeGet(txnp, &start_time); + TSHttpTxnEndTimeGet(txnp, &end_time); +#else + TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_BEGIN, &start_time); + TSHttpTxnMilestoneGet(txnp, TS_MILESTONE_UA_CLOSE, &end_time); +#endif + + if (start_time != 0 && end_time != 0 && end_time >= start_time) { + interval_time = end_time - start_time; + } else { + warning("not valid time, start: %" PRId64", end: %" PRId64"", start_time, end_time); + goto cleanup; + } + + if (interval_time == 0 || body_bytes == 0) + user_speed = MAX_SPEED; + else + user_speed = (int)((float)body_bytes / interval_time * HRTIME_SECOND); + + debug("start time: %" PRId64 "", start_time); + debug("end time: %" PRId64 "", end_time); + debug("interval time: %" PRId64 "", interval_time); + debug("interval seconds: %.5f", interval_time / (float)HRTIME_SECOND); + debug("speed bytes per second: %" PRIu64 "", user_speed); + + /* + // test large number of channels + if (channel_stats.size() < MAX_MAP_SIZE) + ss << channel_stats.size() + 1; + else + ss << (rand() % MAX_MAP_SIZE + 1); + host = host + "--" + ss.str(); + debug("%s", host.c_str()); + */ + + stat_it = channel_stats.find(host); + if (stat_it == channel_stats.end()) { + if (channel_stats.size() >= MAX_MAP_SIZE) { + warning("channel_stats map exceeds max size"); + goto cleanup; + } + stat = new channel_stat(); + TSMutexLock(stats_map_mutex); + insert_ret = channel_stats.insert(std::make_pair(host, stat)); + TSMutexUnlock(stats_map_mutex); + if (insert_ret.second == false) { + warning("stat of this channel already existed"); + delete stat; + stat = insert_ret.first->second; + } else { + debug("********** new channel(%zu) **********", channel_stats.size()); + } + } else { // found + stat = stat_it->second; + } + + stat->increment(body_bytes, 1, user_speed < 64000 ? 1 : 0); + stat->debug_channel(); + +cleanup: + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); +} + +static int +handle_event(TSCont contp, TSEvent event, void *edata) { + TSHttpTxn txnp = (TSHttpTxn) edata; + + switch (event) { + case TS_EVENT_HTTP_READ_REQUEST_HDR: // for global contp + debug("---------- new request ----------"); + handle_read_req(contp, txnp); + break; + case TS_EVENT_HTTP_TXN_CLOSE: // for txn contp + handle_txn_close(contp, txnp); + TSContDestroy(contp); + break; + default: + error("unknown event for this plugin"); + } + + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + + return 0; +} + +// below is api part + +static void +stats_cleanup(TSCont contp, intercept_state * api_state) +{ + if (api_state->req_buffer) { + TSIOBufferDestroy(api_state->req_buffer); + api_state->req_buffer = NULL; + } + + if (api_state->resp_buffer) { + TSIOBufferDestroy(api_state->resp_buffer); + api_state->resp_buffer = NULL; + } + + TSfree(api_state->channel); + TSVConnClose(api_state->net_vc); + TSfree(api_state); + TSContDestroy(contp); +} + +static void +stats_process_accept(TSCont contp, intercept_state * api_state) +{ + + api_state->req_buffer = TSIOBufferCreate(); + api_state->resp_buffer = TSIOBufferCreate(); + api_state->resp_reader = TSIOBufferReaderAlloc(api_state->resp_buffer); + api_state->read_vio = TSVConnRead(api_state->net_vc, contp, api_state->req_buffer, INT64_MAX); +} + +static int +stats_add_data_to_resp_buffer(const char *s, intercept_state * api_state) +{ + int s_len = strlen(s); + + TSIOBufferWrite(api_state->resp_buffer, s, s_len); + + return s_len; +} + +static const char RESP_HEADER[] = + "HTTP/1.0 200 Ok\r\nContent-Type: application/json\r\nCache-Control: no-cache\r\n\r\n"; + +static int +stats_add_resp_header(intercept_state * api_state) +{ + return stats_add_data_to_resp_buffer(RESP_HEADER, api_state); +} + +static void +stats_process_read(TSCont contp, TSEvent event, intercept_state * api_state) +{ + debug_api("stats_process_read(%d)", event); + if (event == TS_EVENT_VCONN_READ_READY) { + api_state->output_bytes = stats_add_resp_header(api_state); + TSVConnShutdown(api_state->net_vc, 1, 0); + api_state->write_vio = TSVConnWrite(api_state->net_vc, contp, api_state->resp_reader, INT64_MAX); + } else if (event == TS_EVENT_ERROR) { + error_api("stats_process_read: Received TS_EVENT_ERROR\n"); + } else if (event == TS_EVENT_VCONN_EOS) { + // client may end the connection, simply return + return; + } else if (event == TS_EVENT_NET_ACCEPT_FAILED) { + error_api("stats_process_read: Received TS_EVENT_NET_ACCEPT_FAILED\n"); + } else { + error_api("Unexpected Event %d\n", event); + // TSReleaseAssert(!"Unexpected Event"); + } +} + +#define APPEND(a) api_state->output_bytes += stats_add_data_to_resp_buffer(a, api_state) +#define APPEND_STAT(a, fmt, v) do { \ + char b[256]; \ + if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\",\n", a, v) < (signed)sizeof(b)) \ + APPEND(b); \ +} while(0) +#define APPEND_END_STAT(a, fmt, v) do { \ + char b[256]; \ + if(snprintf(b, sizeof(b), "\"%s\": \"" fmt "\"\n", a, v) < (signed)sizeof(b)) \ + APPEND(b); \ +} while(0) +#define APPEND_DICT_NAME(a) do { \ + char b[256]; \ + if(snprintf(b, sizeof(b), "\"%s\": {\n", a) < (signed)sizeof(b)) \ + APPEND(b); \ +} while(0) + +static void +json_out_stat(TSRecordType rec_type, void *edata, int registered, + const char *name, TSRecordDataType data_type, + TSRecordData *datum) { + intercept_state *api_state = (intercept_state *) edata; + + switch(data_type) { + case TS_RECORDDATATYPE_COUNTER: + APPEND_STAT(name, "%" PRId64, datum->rec_counter); break; + case TS_RECORDDATATYPE_INT: + APPEND_STAT(name, "%" PRIu64, datum->rec_int); break; + case TS_RECORDDATATYPE_FLOAT: + APPEND_STAT(name, "%f", datum->rec_float); break; + case TS_RECORDDATATYPE_STRING: + APPEND_STAT(name, "%s", datum->rec_string); break; + default: + debug_api("unkown type for %s: %d", name, data_type); + break; + } +} + +template<class T> +struct compare +: std::binary_function<T,T,bool> +{ + inline bool operator()(const T& lhs, const T& rhs) { + return lhs.second->response_count_2xx > rhs.second->response_count_2xx; + } +}; + +static void +append_channel_stat(intercept_state * api_state, + const std::string channel, channel_stat * cs, + int is_last) +{ + APPEND_DICT_NAME(channel.c_str()); + APPEND_STAT("response.bytes.content", "%" PRIu64, cs->response_bytes_content); + APPEND_STAT("response.count.2xx.get", "%" PRIu64, cs->response_count_2xx); + APPEND_END_STAT("speed.ua.bytes_per_sec_64k", "%" PRIu64, cs->speed_ua_bytes_per_sec_64k); + if (is_last) + APPEND("}\n"); + else + APPEND("},\n"); +} + +static void +json_out_channel_stats(intercept_state * api_state) { + if (channel_stats.empty()) + return; + + typedef std::pair<std::string, channel_stat *> data_pair; + typedef std::vector<data_pair> stats_vec_t; + smap_iterator it; + + debug("appending channel stats"); + + if (api_state->topn > -1 || + (api_state->channel && strlen(api_state->channel) > 0)) { + // will use vector to output + + if (api_state->topn == 0) + return; + + stats_vec_t stats_vec; // a tmp vector to sort or filter + if (strlen(api_state->channel) > 0) { + // filter by channel + size_t found; + for (it=channel_stats.begin(); it != channel_stats.end(); it++) { + found = it->first.find(api_state->channel); + if (found != std::string::npos) + stats_vec.push_back(*it); + } + } else { + for (it=channel_stats.begin(); it != channel_stats.end(); it++) + stats_vec.push_back(*it); + /* stats_vec.assign is not safe when map is being inserted concurrently */ + } + + if (stats_vec.empty()) + return; + + stats_vec_t::size_type out_st = stats_vec.size(); + if (api_state->topn > 0) { // need sort and limit output size + if ((unsigned)api_state->topn < stats_vec.size()) + out_st = (unsigned)api_state->topn; + else + api_state->topn = stats_vec.size(); + std::partial_sort(stats_vec.begin(), stats_vec.begin() + api_state->topn, + stats_vec.end(), compare<data_pair>()); + } // else will output whole vector without sort + + stats_vec_t::size_type i; + for (i = 0; i < out_st - 1; i++) { + append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 0); + } + append_channel_stat(api_state, stats_vec[i].first, stats_vec[i].second, 1); + + } else { + smap_iterator last_it = channel_stats.end(); + last_it--; + for (it = channel_stats.begin(); it != last_it; it++) { + append_channel_stat(api_state, it->first, it->second, 0); + } + append_channel_stat(api_state, it->first, it->second, 1); + } +} + +static void +json_out_stats(intercept_state * api_state) +{ + const char *version; + + APPEND("{ \"channel\": {\n"); + json_out_channel_stats(api_state); + APPEND(" },\n"); + + APPEND(" \"global\": {\n"); + APPEND_STAT("response.count.2xx.get", "%" PRIu64, global_response_count_2xx_get); + APPEND_STAT("response.bytes.content", "%" PRIu64, global_response_bytes_content); + APPEND_STAT("channel.count", "%zu", channel_stats.size()); + + if (api_state->show_global) + TSRecordDump(TS_RECORDTYPE_PROCESS, json_out_stat, api_state); // internal stats + + version = TSTrafficServerVersionGet(); + APPEND("\"server\": \""); + APPEND(version); + APPEND("\"\n"); + + APPEND(" }\n}\n"); +} + +static void +stats_process_write(TSCont contp, TSEvent event, intercept_state * api_state) +{ + if (event == TS_EVENT_VCONN_WRITE_READY) { + if (api_state->body_written == 0) { + debug_api("plugin adding response body"); + api_state->body_written = 1; + if (!api_state->deny) + json_out_stats(api_state); + else + APPEND("forbidden"); + TSVIONBytesSet(api_state->write_vio, api_state->output_bytes); + } + TSVIOReenable(api_state->write_vio); + } else if (TS_EVENT_VCONN_WRITE_COMPLETE) { + stats_cleanup(contp, api_state); + } else if (event == TS_EVENT_ERROR) { + error_api("stats_process_write: Received TS_EVENT_ERROR\n"); + } else { + error_api("Unexpected Event %d\n", event); + // TSReleaseAssert(!"Unexpected Event"); + } +} + +static int +api_handle_event(TSCont contp, TSEvent event, void *edata) +{ + intercept_state *api_state = (intercept_state *) TSContDataGet(contp); + if (event == TS_EVENT_NET_ACCEPT) { + api_state->net_vc = (TSVConn) edata; + stats_process_accept(contp, api_state); + } else if (edata == api_state->read_vio) { + stats_process_read(contp, event, api_state); + } else if (edata == api_state->write_vio) { + stats_process_write(contp, event, api_state); + } else { + error_api("Unexpected Event %d\n", event); + // TSReleaseAssert(!"Unexpected Event"); + } + return 0; +} + +// initial part + +static int +check_ts_version() +{ + const char *ts_version = TSTrafficServerVersionGet(); + int result = 0; + + if (ts_version) { + int major_ts_version = 0; + int minor_ts_version = 0; + int patch_ts_version = 0; + + if (sscanf(ts_version, "%d.%d.%d", &major_ts_version, &minor_ts_version, + &patch_ts_version) != 3) { + return 0; + } + + // Need at least TS 3.0.0 + if (major_ts_version >= 3) { + result = 1; + } + } + + return result; +} + +void +TSPluginInit(int argc, const char *argv[]) +{ + if (argc > 2) { + fatal("plugin does not accept more than 1 argument"); + } else if (argc == 2) { + api_path = std::string(argv[1]); + debug_api("stats api path: %s", api_path.c_str()); + } + + TSPluginRegistrationInfo info; + + info.plugin_name = (char *)PLUGIN_NAME; + info.vendor_name = (char *)"wkl"; + info.support_email = (char *)"[email protected]"; + + if (TSPluginRegister(TS_SDK_VERSION_3_0, &info) != TS_SUCCESS) { + fatal("plugin registration failed."); + } + + if (!check_ts_version()) { + fatal("plugin requires Traffic Server 3.0.0 or later"); + } + + info("%s(%s) plugin starting...", PLUGIN_NAME, PLUGIN_VERSION); + + stats_map_mutex = TSMutexCreate(); + + TSCont cont = TSContCreate(handle_event, NULL); + TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, cont); +} + http://git-wip-us.apache.org/repos/asf/trafficserver/blob/de35ad0a/plugins/experimental/channel_stats/debug_macros.h ---------------------------------------------------------------------- diff --git a/plugins/experimental/channel_stats/debug_macros.h b/plugins/experimental/channel_stats/debug_macros.h new file mode 100644 index 0000000..519c608 --- /dev/null +++ b/plugins/experimental/channel_stats/debug_macros.h @@ -0,0 +1,77 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef _DBG_MACROS_H +#define _DBG_MACROS_H + +#define __STDC_FORMAT_MACROS +#include <inttypes.h> + +#define TAG PLUGIN_NAME +#define API_TAG PLUGIN_NAME ".api" + +#define unlikely(x) __builtin_expect(!!(x), 0) +#define likely(x) __builtin_expect(!!(x), 1) + +#define debug_tag(tag, fmt, ...) do { \ + if (unlikely(TSIsDebugTagSet(tag))) { \ + TSDebug(tag, fmt, ##__VA_ARGS__); \ + } \ +} while(0) + +#define debug(fmt, ...) \ + debug_tag(TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); + +#define info(fmt, ...) \ + debug_tag(TAG, "INFO: " fmt, ##__VA_ARGS__); + +#define warning(fmt, ...) \ + debug_tag(TAG, "WARNING: " fmt, ##__VA_ARGS__); + +#define error(fmt, ...) do { \ + TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ + debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ +} while (0) + +#define fatal(fmt, ...) do { \ + TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ + debug_tag(TAG, "[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ + exit(-1); \ +} while (0) + +#define debug_api(fmt, ...) \ + debug_tag(API_TAG, "DEBUG: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); + +#define error_api(fmt, ...) do { \ + TSError("[%s:%d] [%s] ERROR: " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ + debug_tag(API_TAG, "ERROR: [%s:%d] [%s] " fmt, __FILE__, __LINE__, __FUNCTION__ , ##__VA_ARGS__); \ +} while (0) + +#define HRTIME_FOREVER (10*HRTIME_DECADE) +#define HRTIME_DECADE (10*HRTIME_YEAR) +#define HRTIME_YEAR (365*HRTIME_DAY+HRTIME_DAY/4) +#define HRTIME_WEEK (7*HRTIME_DAY) +#define HRTIME_DAY (24*HRTIME_HOUR) +#define HRTIME_HOUR (60*HRTIME_MINUTE) +#define HRTIME_MINUTE (60*HRTIME_SECOND) +#define HRTIME_SECOND (1000*HRTIME_MSECOND) +#define HRTIME_MSECOND (1000*HRTIME_USECOND) +#define HRTIME_USECOND (1000*HRTIME_NSECOND) +#define HRTIME_NSECOND (1LL) + +#endif //_DBG_MACROS_H
