Hello list,

Please find attached a patch against collectd-5.4.0 release which aims to allow HTTP push to OpenTSDB (http://opentsdb.net/).

The changes are only in write_http.c file. It is basically a fork of JSON write http to introduce a new JSON format to allow push to OpenTSDB.

OpenTSDB put API is described here :
    http://opentsdb.net/docs/build/html/api_http/put.html

You will need to set the "format" configuration option of write_http plugin to "JSONTSDB"

Another introduced feature is the ability to change the default buffer size of the plugin (default: 4096 bytes). You can now push it to 32768 bytes, which allow to make fewer but bigger http push. The new option is "BufferSize" and takes values between 100 and 32768 bytes. 4096 bytes remains the default value if unspecified.

<Plugin write_http>
       <URL "http://mytsdb:4242/api/put";>
                Format "JSONTSDB"
                #User "myuser"
                #Password "mypass"
                StoreRates false
                BufferSize 32768
       </URL>
</Plugin>

It is successfully used with OpenTSDB 2.0 RC2 and collectd 5.1 (debian stable) and 5.4 (current). Thanks to Acipia Team (www.acipia.fr) for the work !

Pierre

diff -pur collectd-5.4.0/src/write_http.c collectd-5.4.0-mine/src/write_http.c
--- collectd-5.4.0/src/write_http.c	2013-08-18 12:24:25.093973759 +0200
+++ collectd-5.4.0-mine/src/write_http.c	2014-01-17 14:45:05.651654320 +0100
@@ -29,6 +29,7 @@
 #include "utils_cache.h"
 #include "utils_parse_option.h"
 #include "utils_format_json.h"
+#include "utils_subst.h"
 
 #if HAVE_PTHREAD_H
 # include <pthread.h>
@@ -53,12 +54,15 @@ struct wh_callback_s
 
 #define WH_FORMAT_COMMAND 0
 #define WH_FORMAT_JSON    1
+#define WH_FORMAT_JSONTSDB 2
         int format;
 
         CURL *curl;
         char curl_errbuf[CURL_ERROR_SIZE];
 
-        char   send_buffer[4096];
+        char   *send_buffer;
+#define DEFAULT_BUFFER_SIZE  4096
+        double buffer_size;
         size_t send_buffer_free;
         size_t send_buffer_fill;
         cdtime_t send_buffer_init_time;
@@ -69,8 +73,8 @@ typedef struct wh_callback_s wh_callback
 
 static void wh_reset_buffer (wh_callback_t *cb)  /* {{{ */
 {
-        memset (cb->send_buffer, 0, sizeof (cb->send_buffer));
-        cb->send_buffer_free = sizeof (cb->send_buffer);
+        memset (cb->send_buffer, 0, cb->buffer_size);
+        cb->send_buffer_free = cb->buffer_size;
         cb->send_buffer_fill = 0;
         cb->send_buffer_init_time = cdtime ();
 
@@ -85,7 +89,9 @@ static void wh_reset_buffer (wh_callback
 static int wh_send_buffer (wh_callback_t *cb) /* {{{ */
 {
         int status = 0;
+        long return_code = 0;
 
+        DEBUG ("write_http: send buffer with %i bytes", cb->send_buffer_fill) ;
         curl_easy_setopt (cb->curl, CURLOPT_POSTFIELDS, cb->send_buffer);
         status = curl_easy_perform (cb->curl);
         if (status != CURLE_OK)
@@ -93,7 +99,13 @@ static int wh_send_buffer (wh_callback_t
                 ERROR ("write_http plugin: curl_easy_perform failed with "
                                 "status %i: %s",
                                 status, cb->curl_errbuf);
+                DEBUG ("write_http plugin: raw post data %s",
+                                cb->send_buffer);
+        }  else {
+                curl_easy_getinfo(cb->curl, CURLINFO_RESPONSE_CODE, &return_code);
+                DEBUG ("http return code : %ld", return_code) ;
         }
+
         return (status);
 } /* }}} wh_send_buffer */
 
@@ -116,7 +128,7 @@ static int wh_callback_init (wh_callback
 
         headers = NULL;
         headers = curl_slist_append (headers, "Accept:  */*");
-        if (cb->format == WH_FORMAT_JSON)
+        if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_JSONTSDB)
                 headers = curl_slist_append (headers, "Content-Type: application/json");
         else
                 headers = curl_slist_append (headers, "Content-Type: text/plain");
@@ -188,7 +200,7 @@ static int wh_flush_nolock (cdtime_t tim
                 status = wh_send_buffer (cb);
                 wh_reset_buffer (cb);
         }
-        else if (cb->format == WH_FORMAT_JSON)
+        else if (cb->format == WH_FORMAT_JSON || cb->format == WH_FORMAT_JSONTSDB)
         {
                 if (cb->send_buffer_fill <= 2)
                 {
@@ -269,7 +281,7 @@ static void wh_callback_free (void *data
         sfree (cb->pass);
         sfree (cb->credentials);
         sfree (cb->cacert);
-
+        sfree (cb->send_buffer);
         sfree (cb);
 } /* }}} void wh_callback_free */
 
@@ -414,6 +426,158 @@ static int wh_write_json (const data_set
         return (0);
 } /* }}} int wh_write_json */
 
+static int wh_write_json_tsdb(const data_set_t *ds, const value_list_t *vl, /* {{{ */
+                wh_callback_t *cb)
+{
+        char key[10*DATA_MAX_NAME_LEN];
+        char values[512];
+        char tags[512];
+        char command[1024];
+        size_t command_len = 0;
+        int i ;
+        gauge_t *rates = NULL;
+
+        // local vars
+        char hostname_t[DATA_MAX_NAME_LEN];
+        char plugin_t[DATA_MAX_NAME_LEN];
+        char type_t[DATA_MAX_NAME_LEN];
+        char command_t[1024];
+        char temp[DATA_MAX_NAME_LEN];
+
+        int status;
+        int tags_len = sizeof(tags);
+
+        if (0 != strcmp (ds->type, vl->type)) {
+                ERROR ("write_http plugin: DS type does not match "
+                                "value list type");
+                return -1;
+        }
+
+        // TSDB tags dont support spaces
+        if (subst_string (temp, sizeof (temp), (vl)->host, " ", "_") != NULL)
+                sstrncpy (hostname_t, temp, sizeof (hostname_t));
+        if (subst_string (temp, sizeof (temp), (vl)->plugin_instance, " ", "_") != NULL)
+                sstrncpy (plugin_t, temp, sizeof (plugin_t));
+        if (subst_string (temp, sizeof (temp), (vl)->type_instance, " ", "_") != NULL)
+                sstrncpy (type_t, temp, sizeof (type_t));
+
+
+        // tag list
+          if ((plugin_t == NULL) || (strlen (plugin_t) == 0))
+	        {
+           if ((type_t == NULL) || (strlen (type_t) == 0))
+	         {
+            status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\"}", hostname_t) ;
+           } else {
+            status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"type\": \"%s\"}", hostname_t, type_t) ;
+           }
+          } else {
+           if ((type_t == NULL) || (strlen (type_t) == 0))
+	         {
+            status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"plugin\": \"%s\"}", hostname_t, plugin_t) ;
+           } else {
+            status = ssnprintf (tags, tags_len, "\"tags\": {\"host\": \"%s\", \"plugin\": \"%s\", \"type\": \"%s\"}", hostname_t, plugin_t, type_t) ;
+           }
+          }
+	if (status < 1) {
+            WARNING ("write_http_jsontsdb plugin: unable to build tag list");
+	    return (-1);
+        }
+
+        // value list loop
+        for (i = 0; i < ds->ds_num; i++)
+        {
+                ssnprintf(key, sizeof(key), "%s.%s", (vl)->type, ds->ds[i].name);
+
+                if (ds->ds[i].type == DS_TYPE_GAUGE)
+                        ssnprintf (values, sizeof(values), "\"value\": %f", vl->values[i].gauge);
+                else if (cb->store_rates)
+                {
+                        if (rates == NULL)
+                                rates = uc_get_rate (ds, vl);
+                        if (rates == NULL)
+                        {
+                                WARNING ("format_values: "
+						"uc_get_rate failed.");
+                                return (-1);
+                        }
+                        ssnprintf (values, sizeof(values), "\"value\": %g", rates[i]);
+                }
+                else if (ds->ds[i].type == DS_TYPE_COUNTER)
+                        ssnprintf (values, sizeof(values), "\"value\": %llu", vl->values[i].counter);
+                else if (ds->ds[i].type == DS_TYPE_DERIVE)
+                        ssnprintf (values, sizeof(values), "\"value\": %"PRIi64, vl->values[i].derive);
+                else if (ds->ds[i].type == DS_TYPE_ABSOLUTE)
+                        ssnprintf (values, sizeof(values), "\"value\": %"PRIu64, vl->values[i].absolute);
+                else
+                {
+                        ERROR ("format_values plugin: Unknown data source type: %i",
+                                        ds->ds[i].type);
+                        sfree (rates);
+                        return (-1);
+                }
+                        ssnprintf (command_t, sizeof(command_t),
+                                ",{ \"metric\": \"%s\", \"timestamp\": \"%.0f\", %s, %s}\r\n",
+                                key,
+                                CDTIME_T_TO_DOUBLE (vl->time),
+                                values,
+                                tags
+                        );
+
+                        strncat(command, command_t, (sizeof(command) - strlen(command))) ;
+
+                if (command_len >= sizeof (command)) {
+                        ERROR ("write_http plugin: Command buffer too small: "
+                                        "Need %zu bytes.", command_len + 1);
+                        return (-1);
+                }
+        }
+
+        command_len = (size_t) strlen(command);
+
+        pthread_mutex_lock (&cb->send_lock);
+
+        if (cb->curl == NULL)
+        {
+                status = wh_callback_init (cb);
+                if (status != 0)
+                {
+                        ERROR ("write_http plugin: wh_callback_init failed.");
+                        pthread_mutex_unlock (&cb->send_lock);
+                        return (-1);
+                }
+        }
+
+        if (command_len >= cb->send_buffer_free)
+        {
+                status = wh_flush_nolock (/* timeout = */ 0, cb);
+                if (status != 0)
+                {
+                        pthread_mutex_unlock (&cb->send_lock);
+                        return (status);
+                }
+        }
+        assert (command_len < cb->send_buffer_free);
+
+        /* `command_len + 1' because `command_len' does not include the
+         * trailing null byte. Neither does `send_buffer_fill'. */
+        memcpy (cb->send_buffer + cb->send_buffer_fill,
+                        command, command_len + 1);
+        cb->send_buffer_fill += command_len;
+        cb->send_buffer_free -= command_len;
+
+        DEBUG ("write_http plugin: <%s> buffer %zu/%g (%g%%) \"%s\"",
+                        cb->location,
+                        cb->send_buffer_fill, cb->buffer_size,
+                        100.0 * ((double) cb->send_buffer_fill) / ((double) cb->buffer_size),
+                        command);
+
+        /* Check if we have enough space for this command. */
+        pthread_mutex_unlock (&cb->send_lock);
+
+        return (0);
+} /* }}} int wh_write_json_tsdb */
+
 static int wh_write (const data_set_t *ds, const value_list_t *vl, /* {{{ */
                 user_data_t *user_data)
 {
@@ -427,6 +591,8 @@ static int wh_write (const data_set_t *d
 
         if (cb->format == WH_FORMAT_JSON)
                 status = wh_write_json (ds, vl, cb);
+        else if (cb->format == WH_FORMAT_JSONTSDB)
+                status = wh_write_json_tsdb (ds, vl, cb);
         else
                 status = wh_write_command (ds, vl, cb);
 
@@ -492,6 +658,8 @@ static int config_set_format (wh_callbac
                 cb->format = WH_FORMAT_COMMAND;
         else if (strcasecmp ("JSON", string) == 0)
                 cb->format = WH_FORMAT_JSON;
+        else if (strcasecmp ("JSONTSDB", string) == 0)
+                cb->format = WH_FORMAT_JSONTSDB;
         else
         {
                 ERROR ("write_http plugin: Invalid format string: %s",
@@ -502,6 +670,28 @@ static int config_set_format (wh_callbac
         return (0);
 } /* }}} int config_set_string */
 
+static double config_set_bs (oconfig_item_t *ci /* {{{ */
+                )
+{
+        // set buffer size from config file
+        if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_NUMBER))
+        {
+                WARNING ("write_http plugin: The `%s' config option "
+                                "needs exactly one numeric argument (min 100 / max 32768).", ci->key);
+                return (DEFAULT_BUFFER_SIZE);
+        }
+
+        if ( ( ci->values[0].value.number >= 100 ) && ( ci->values[0].value.number <= 32768 ) )
+                return (ci->values[0].value.number);
+        else {
+                WARNING ("write_http plugin: The `%s' config option "
+                                "needs to be between 100 and 32768. default: 4096.", ci->key);
+                return (DEFAULT_BUFFER_SIZE);
+        }
+        return (DEFAULT_BUFFER_SIZE) ;
+        
+} /* }}} int config_set_bs */
+
 static int wh_config_url (oconfig_item_t *ci) /* {{{ */
 {
         wh_callback_t *cb;
@@ -522,6 +712,8 @@ static int wh_config_url (oconfig_item_t
         cb->verify_peer = 1;
         cb->verify_host = 1;
         cb->cacert = NULL;
+        cb->send_buffer = NULL;
+        cb->buffer_size = DEFAULT_BUFFER_SIZE;
         cb->format = WH_FORMAT_COMMAND;
         cb->curl = NULL;
 
@@ -549,13 +741,19 @@ static int wh_config_url (oconfig_item_t
                         config_set_format (cb, child);
                 else if (strcasecmp ("StoreRates", child->key) == 0)
                         config_set_boolean (&cb->store_rates, child);
-                else
+                else if (strcasecmp ("BufferSize", child->key) == 0) {
+                        cb->buffer_size = config_set_bs(child);
+                } else
                 {
                         ERROR ("write_http plugin: Invalid configuration "
                                         "option: %s.", child->key);
                 }
+               
         }
 
+        cb->send_buffer = (char*)malloc((cb->buffer_size)*sizeof (cb->send_buffer));
+        cb->send_buffer[0] = '\0';
+        DEBUG("write_http plugin: set send buffer to %g", cb->buffer_size); 
         DEBUG ("write_http: Registering write callback with URL %s",
                         cb->location);
 
_______________________________________________
collectd mailing list
[email protected]
http://mailman.verplant.org/listinfo/collectd

Reply via email to