Hello list,
Please find attached a patch against collectd-5.4.0 release which aims
to allow HTTP push to OpenTSDB (http://opentsdb.net/).
The changes are only in write_http.c file. It is basically a fork of
JSON write http to introduce a new JSON format to allow push to OpenTSDB.
OpenTSDB put API is described here :
http://opentsdb.net/docs/build/html/api_http/put.html
You will need to set the "format" configuration option of write_http
plugin to "JSONTSDB"
Another introduced feature is the ability to change the default buffer
size of the plugin (default: 4096 bytes). You can now push it to 32768
bytes, which allow to make fewer but bigger http push. The new option is
"BufferSize" and takes values between 100 and 32768 bytes. 4096 bytes
remains the default value if unspecified.
<Plugin write_http>
<URL "http://mytsdb:4242/api/put">
Format "JSONTSDB"
#User "myuser"
#Password "mypass"
StoreRates false
BufferSize 32768
</URL>
</Plugin>
It is successfully used with OpenTSDB 2.0 RC2 and collectd 5.1 (debian
stable) and 5.4 (current). Thanks to Acipia Team (www.acipia.fr) for the
work !
Pierre
diff -pur collectd-5.4.0/src/write_http.c collectd-5.4.0-mine/src/write_http.c
--- collectd-5.4.0/src/write_http.c 2013-08-18 12:24:25.093973759 +0200
+++ collectd-5.4.0-mine/src/write_http.c 2014-01-17 14:45:05.651654320 +0100
@@ -29,6 +29,7 @@
#include "utils_cache.h"
#include "utils_parse_option.h"
#include "utils_format_json.h"
+#include "utils_subst.h"
#if HAVE_PTHREAD_H
# include <pthread.h>
@@ -53,12 +54,15 @@ struct wh_callback_s
#define WH_FORMAT_COMMAND 0
#define WH_FORMAT_JSON 1
+#define WH_FORMAT_JSONTSDB 2
int format;
CURL *curl;
char curl_errbuf[CURL_ERROR_SIZE];
- char send_buffer[4096];
+ char *send_buffer;
+#define DEFAULT_BUFFER_SIZE 4096
+ double buffer_size;
size_t send_buffer_free;
size_t send_buffer_fill;
cdtime_t send_buffer_init_time;
@@ -69,8 +73,8 @@ typedef struct wh_callback_s wh_callback
static void wh_reset_buffer (wh_callback_t *cb) /* {{{ */
{
- memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
- cb->send_buffer_free = sizeof (cb->send_buffer);
+ memset (cb->send_buffer, 0, cb->buffer_size);
+ cb->send_buffer_free = cb->buffer_size;
cb->send_buffer_fill = 0;
cb->send_buffer_init_time = cdtime ();
@@ -85,7 +89,9 @@ static void wh_reset_buffer (wh_callback
static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
{
int status = 0;
+ long return_code = 0;
+ DEBUG ("write_http: send buffer with %i bytes", cb->send_buffer_fill) ;
curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
status = curl_easy_perform (cb->curl);
if (status != CURLE_OK)
@@ -93,7 +99,13 @@ static int wh_send_buffer (wh_callback_t
ERROR ("write_http plugin: curl_easy_perform failed with "
"status %i: %s",
status, cb->curl_errbuf);
+ DEBUG ("write_http plugin: raw post data %s",
+ cb->send_buffer);
+ } else {
+ curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &return_code);
+ DEBUG ("http return code : %ld", return_code) ;
}
+
return (status);
} /* }}} wh_send_buffer */
@@ -116,7 +128,7 @@ static int wh_callback_init (wh_callback
headers = NULL;
headers = curl_slist_append (headers, "Accept: */*");
- if (cb->format == WH_FORMAT_JSON)
+ if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_JSONTSDB)
headers = curl_slist_append (headers, "Content-Type: application/json");
else
headers = curl_slist_append (headers, "Content-Type: text/plain");
@@ -188,7 +200,7 @@ static int wh_flush_nolock (cdtime_t tim
status = wh_send_buffer (cb);
wh_reset_buffer (cb);
}
- else if (cb->format == WH_FORMAT_JSON)
+ else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_JSONTSDB)
{
if (cb->send_buffer_fill <= 2)
{
@@ -269,7 +281,7 @@ static void wh_callback_free (void *data
sfree (cb->pass);
sfree (cb->credentials);
sfree (cb->cacert);
-
+ sfree (cb->send_buffer);
sfree (cb);
} /* }}} void wh_callback_free */
@@ -414,6 +426,158 @@ static int wh_write_json (const data_set
return (0);
} /* }}} int wh_write_json */
+static int wh_write_json_tsdb(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+ wh_callback_t *cb)
+{
+ char key[10*DATA_MAX_NAME_LEN];
+ char values[512];
+ char tags[512];
+ char command[1024];
+ size_t command_len = 0;
+ int i ;
+ gauge_t *rates = NULL;
+
+ // local vars
+ char hostname_t[DATA_MAX_NAME_LEN];
+ char plugin_t[DATA_MAX_NAME_LEN];
+ char type_t[DATA_MAX_NAME_LEN];
+ char command_t[1024];
+ char temp[DATA_MAX_NAME_LEN];
+
+ int status;
+ int tags_len = sizeof(tags);
+
+ if (0 != strcmp (ds->type, vl->type)) {
+ ERROR ("write_http plugin: DS type does not match "
+ "value list type");
+ return -1;
+ }
+
+ // TSDB tags dont support spaces
+ if (subst_string (temp, sizeof (temp), (vl)->host, " ", "_") != NULL)
+ sstrncpy (hostname_t, temp, sizeof (hostname_t));
+ if (subst_string (temp, sizeof (temp), (vl)->plugin_instance, " ", "_") != NULL)
+ sstrncpy (plugin_t, temp, sizeof (plugin_t));
+ if (subst_string (temp, sizeof (temp), (vl)->type_instance, " ", "_") != NULL)
+ sstrncpy (type_t, temp, sizeof (type_t));
+
+
+ // tag list
+ if ((plugin_t == NULL) || (strlen (plugin_t) == 0))
+ {
+ if ((type_t == NULL) || (strlen (type_t) == 0))
+ {
+ status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\"}", hostname_t) ;
+ } else {
+ status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"type\": \"%s\"}", hostname_t, type_t) ;
+ }
+ } else {
+ if ((type_t == NULL) || (strlen (type_t) == 0))
+ {
+ status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"plugin\": \"%s\"}", hostname_t, plugin_t) ;
+ } else {
+ status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"plugin\": \"%s\", \"type\": \"%s\"}", hostname_t, plugin_t, type_t) ;
+ }
+ }
+ if (status < 1) {
+ WARNING ("write_http_jsontsdb plugin: unable to build tag list");
+ return (-1);
+ }
+
+ // value list loop
+ for (i = 0; i < ds->ds_num; i++)
+ {
+ ssnprintf(key, sizeof(key), "%s.%s", (vl)->type, ds->ds[i].name);
+
+ if (ds->ds[i].type == DS_TYPE_GAUGE)
+ ssnprintf (values, sizeof(values), "\"value\": %f", vl->values[i].gauge);
+ else if (cb->store_rates)
+ {
+ if (rates == NULL)
+ rates = uc_get_rate (ds, vl);
+ if (rates == NULL)
+ {
+ WARNING ("format_values: "
+ "uc_get_rate failed.");
+ return (-1);
+ }
+ ssnprintf (values, sizeof(values), "\"value\": %g", rates[i]);
+ }
+ else if (ds->ds[i].type == DS_TYPE_COUNTER)
+ ssnprintf (values, sizeof(values), "\"value\": %llu", vl->values[i].counter);
+ else if (ds->ds[i].type == DS_TYPE_DERIVE)
+ ssnprintf (values, sizeof(values), "\"value\": %"PRIi64, vl->values[i].derive);
+ else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+ ssnprintf (values, sizeof(values), "\"value\": %"PRIu64, vl->values[i].absolute);
+ else
+ {
+ ERROR ("format_values plugin: Unknown data source type: %i",
+ ds->ds[i].type);
+ sfree (rates);
+ return (-1);
+ }
+ ssnprintf (command_t, sizeof(command_t),
+ ",{ \"metric\": \"%s\", \"timestamp\": \"%.0f\", %s, %s}\r\n",
+ key,
+ CDTIME_T_TO_DOUBLE (vl->time),
+ values,
+ tags
+ );
+
+ strncat(command, command_t, (sizeof(command) - strlen(command))) ;
+
+ if (command_len >= sizeof (command)) {
+ ERROR ("write_http plugin: Command buffer too small: "
+ "Need %zu bytes.", command_len + 1);
+ return (-1);
+ }
+ }
+
+ command_len = (size_t) strlen(command);
+
+ pthread_mutex_lock (&cb->send_lock);
+
+ if (cb->curl == NULL)
+ {
+ status = wh_callback_init (cb);
+ if (status != 0)
+ {
+ ERROR ("write_http plugin: wh_callback_init failed.");
+ pthread_mutex_unlock (&cb->send_lock);
+ return (-1);
+ }
+ }
+
+ if (command_len >= cb->send_buffer_free)
+ {
+ status = wh_flush_nolock (/* timeout = */ 0, cb);
+ if (status != 0)
+ {
+ pthread_mutex_unlock (&cb->send_lock);
+ return (status);
+ }
+ }
+ assert (command_len < cb->send_buffer_free);
+
+ /* `command_len + 1' because `command_len' does not include the
+ * trailing null byte. Neither does `send_buffer_fill'. */
+ memcpy (cb->send_buffer + cb->send_buffer_fill,
+ command, command_len + 1);
+ cb->send_buffer_fill += command_len;
+ cb->send_buffer_free -= command_len;
+
+ DEBUG ("write_http plugin: <%s> buffer %zu/%g (%g%%) \"%s\"",
+ cb->location,
+ cb->send_buffer_fill, cb->buffer_size,
+ 100.0 * ((double) cb->send_buffer_fill) / ((double) cb->buffer_size),
+ command);
+
+ /* Check if we have enough space for this command. */
+ pthread_mutex_unlock (&cb->send_lock);
+
+ return (0);
+} /* }}} int wh_write_json_tsdb */
+
static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
user_data_t *user_data)
{
@@ -427,6 +591,8 @@ static int wh_write (const data_set_t *d
if (cb->format == WH_FORMAT_JSON)
status = wh_write_json (ds, vl, cb);
+ else if (cb->format == WH_FORMAT_JSONTSDB)
+ status = wh_write_json_tsdb (ds, vl, cb);
else
status = wh_write_command (ds, vl, cb);
@@ -492,6 +658,8 @@ static int config_set_format (wh_callbac
cb->format = WH_FORMAT_COMMAND;
else if (strcasecmp ("JSON", string) == 0)
cb->format = WH_FORMAT_JSON;
+ else if (strcasecmp ("JSONTSDB", string) == 0)
+ cb->format = WH_FORMAT_JSONTSDB;
else
{
ERROR ("write_http plugin: Invalid format string: %s",
@@ -502,6 +670,28 @@ static int config_set_format (wh_callbac
return (0);
} /* }}} int config_set_string */
+static double config_set_bs (oconfig_item_t *ci /* {{{ */
+ )
+{
+ // set buffer size from config file
+ if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
+ {
+ WARNING ("write_http plugin: The `%s' config option "
+ "needs exactly one numeric argument (min 100 / max 32768).", ci->key);
+ return (DEFAULT_BUFFER_SIZE);
+ }
+
+ if ( ( ci->values[0].value.number >= 100 ) && ( ci->values[0].value.number <= 32768 ) )
+ return (ci->values[0].value.number);
+ else {
+ WARNING ("write_http plugin: The `%s' config option "
+ "needs to be between 100 and 32768. default: 4096.", ci->key);
+ return (DEFAULT_BUFFER_SIZE);
+ }
+ return (DEFAULT_BUFFER_SIZE) ;
+
+} /* }}} int config_set_bs */
+
static int wh_config_url (oconfig_item_t *ci) /* {{{ */
{
wh_callback_t *cb;
@@ -522,6 +712,8 @@ static int wh_config_url (oconfig_item_t
cb->verify_peer = 1;
cb->verify_host = 1;
cb->cacert = NULL;
+ cb->send_buffer = NULL;
+ cb->buffer_size = DEFAULT_BUFFER_SIZE;
cb->format = WH_FORMAT_COMMAND;
cb->curl = NULL;
@@ -549,13 +741,19 @@ static int wh_config_url (oconfig_item_t
config_set_format (cb, child);
else if (strcasecmp ("StoreRates", child->key) == 0)
config_set_boolean (&cb->store_rates, child);
- else
+ else if (strcasecmp ("BufferSize", child->key) == 0) {
+ cb->buffer_size = config_set_bs(child);
+ } else
{
ERROR ("write_http plugin: Invalid configuration "
"option: %s.", child->key);
}
+
}
+ cb->send_buffer = (char*)malloc((cb->buffer_size)*sizeof (cb->send_buffer));
+ cb->send_buffer[0] = '\0';
+ DEBUG("write_http plugin: set send buffer to %g", cb->buffer_size);
DEBUG ("write_http: Registering write callback with URL %s",
cb->location);
_______________________________________________
collectd mailing list
[email protected]
http://mailman.verplant.org/listinfo/collectd