Hi all,
For my needs at work I wrote my second plugin for
collectd : write_mysql
I hope it make you more enthusiastic than my
first plugin
(http://mailman.verplant.org/pipermail/collectd/2011-October/004766.html
[1]).
This plugin allow you to write data into MySQL in a relational
scheme.
In attachment :
- collectd-write_mysql.patch (The patch)
-
write_mysql.conf (conf file for collectd)
- mysql-collectd-scheme.sql
(database and tables creation SQL)
Regards,
Cyril Feraudet
Links:
------
[1]
http://mailman.verplant.org/pipermail/collectd/2011-October/004766.html
--- ../collectd-5.0.0/configure.in 2011-03-27 18:35:08.000000000 +0200
+++ ./configure.in 2011-11-07 13:38:55.000000000 +0100
@@ -2242,7 +2242,7 @@
fi
if test "x$with_libmysql" = "xyes"
then
- with_mysql_libs=`$with_mysql_config --libs 2>/dev/null`
+ with_mysql_libs=`$with_mysql_config --libs_r 2>/dev/null`
mysql_config_status=$?
if test $mysql_config_status -ne 0
@@ -4713,6 +4713,7 @@
AC_PLUGIN([vserver], [$plugin_vserver], [Linux VServer statistics])
AC_PLUGIN([wireless], [$plugin_wireless], [Wireless statistics])
AC_PLUGIN([write_http], [$with_libcurl], [HTTP output plugin])
+AC_PLUGIN([write_mysql], [$with_libmysql], [MySQL output plugin])
AC_PLUGIN([write_redis], [$with_libcredis], [Redis output plugin])
AC_PLUGIN([xmms], [$with_libxmms], [XMMS statistics])
AC_PLUGIN([zfs_arc], [$plugin_zfs_arc], [ZFS ARC statistics])
@@ -5039,6 +5040,7 @@
vserver . . . . . . . $enable_vserver
wireless . . . . . . $enable_wireless
write_http . . . . . $enable_write_http
+ write_mysql . . . . . $enable_write_mysql
write_redis . . . . . $enable_write_redis
xmms . . . . . . . . $enable_xmms
zfs_arc . . . . . . . $enable_zfs_arc
--- ../collectd-5.0.0/src/Makefile.am 2011-03-27 18:35:08.000000000 +0200
+++ ./src/Makefile.am 2011-11-07 13:40:01.000000000 +0100
@@ -259,6 +259,18 @@
collectd_DEPENDENCIES += csv.la
endif
+if BUILD_PLUGIN_WRITE_MYSQL
+pkglib_LTLIBRARIES += write_mysql.la
+write_mysql_la_SOURCES = write_mysql.c
+write_mysql_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" write_mysql.la
+if BUILD_WITH_LIBMYSQL
+write_mysql_la_CFLAGS = $(BUILD_WITH_LIBMYSQL_CFLAGS)
+write_mysql_la_LIBADD = $(BUILD_WITH_LIBMYSQL_LIBS) -lmysqlclient_r
+endif
+collectd_DEPENDENCIES += write_mysql.la
+endif
+
if BUILD_PLUGIN_CURL
pkglib_LTLIBRARIES += curl.la
curl_la_SOURCES = curl.c
--- ../collectd-5.0.0/src/write_mysql.c 1970-01-01 01:00:00.000000000 +0100
+++ ./src/write_mysql.c 2011-11-07 13:34:41.000000000 +0100
@@ -0,0 +1,391 @@
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#ifdef HAVE_MYSQL_H
+#include <mysql.h>
+#elif defined(HAVE_MYSQL_MYSQL_H)
+#include <mysql/mysql.h>
+#endif
+#include <pthread.h>
+
+typedef struct host_s host_t;
+struct host_s {
+ char name[DATA_MAX_NAME_LEN];
+ int id;
+ host_t *next_host;
+};
+
+typedef struct plugin_s plugin_t;
+struct plugin_s {
+ char name[DATA_MAX_NAME_LEN];
+ int id;
+ plugin_t *next_plugin;
+};
+
+typedef struct type_s type_t;
+struct type_s {
+ char name[DATA_MAX_NAME_LEN];
+ int id;
+ type_t *next_type;
+};
+
+typedef struct dataset_s dataset_t;
+struct dataset_s {
+ char name[DATA_MAX_NAME_LEN];
+ int id;
+ int type_id;
+ dataset_t *next_dataset;
+};
+
+
+static const char *config_keys[] =
+{
+ "Host",
+ "User",
+ "Passwd",
+ "Database",
+ "Port"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+char *host = "localhost";
+char *user = "root";
+char *passwd = "";
+char *database = "collectd";
+int port = 0;
+
+host_t *host_list = NULL;
+plugin_t *plugin_list = NULL;
+type_t *type_list = NULL;
+dataset_t *dataset_list = NULL;
+MYSQL *conn;
+pthread_mutex_t mutex;
+
+static int write_mysql_config (const char *key, const char *value) {
+ if (strcasecmp ("Host", key) == 0) {
+ host = strdup (value);
+ } else if (strcasecmp ("User", key) == 0) {
+ user = strdup (value);
+ } else if (strcasecmp ("Passwd", key) == 0) {
+ passwd = strdup (value);
+ } else if (strcasecmp ("Database", key) == 0) {
+ database = strdup (value);
+ } else if (strcasecmp ("Port", key) == 0) {
+ port = value;
+ }
+}
+
+static int write_mysql_init (void) {
+ conn = mysql_init(NULL);
+ if (mysql_real_connect(conn, host, user, passwd, database, port, NULL, 0) == NULL) {
+ ERROR("write_mysql plugin: Failed to connect to database %s "
+ " at server %s with user %s : %s", database, host, user, mysql_error (conn));
+ plugin_unregister_write ("write_mysql");
+ return (-1);
+ }
+ if (!mysql_thread_safe()) {
+ ERROR("write_mysql plugin: mysqlclient Thread Safe OFF");
+ return (-1);
+ } else {
+ DEBUG("write_mysql plugin: mysqlclient Thread Safe ON");
+ }
+ return (0);
+}
+
+
+static int add_host_id (char *hostname) {
+ int id, len;
+ id = '\0';
+ MYSQL_RES *result;
+ MYSQL_ROW row;
+ int num_fields;
+ char query[1024];
+ host_t *newhost;
+ len = ssnprintf (query, sizeof (query), "SELECT id FROM host WHERE name = '%s'", hostname);
+ DEBUG("write_mysql plugin: %s", query);
+ pthread_mutex_lock(&mutex);
+ mysql_real_query(conn, query, len);
+ result = mysql_store_result(conn);
+ if (row = mysql_fetch_row(result)) {
+ id = atoi(row[0]);
+ mysql_free_result(result);
+ pthread_mutex_unlock(&mutex);
+ } else {
+ mysql_free_result(result);
+ len = ssnprintf (query, sizeof (query), "INSERT INTO host (name) VALUES ('%s')", hostname);
+ DEBUG("write_mysql plugin: %s", query);
+ mysql_real_query(conn, query, len);
+ id = (int)mysql_insert_id(conn);
+ }
+ newhost = malloc(sizeof(host_t));
+ sstrncpy (newhost->name, hostname, sizeof(newhost->name));
+ newhost->id = id;
+ newhost->next_host = NULL;
+ if (host_list == NULL) {
+ host_list = newhost;
+ } else {
+ host_t *host = host_list;
+ while (host->next_host != NULL) {
+ host = host->next_host;
+ }
+ host->next_host = newhost;
+ }
+ pthread_mutex_unlock(&mutex);
+ return id;
+}
+
+static int add_plugin_id (char *pluginname) {
+ int id, len;
+ id = '\0';
+ MYSQL_RES *result;
+ MYSQL_ROW row;
+ int num_fields;
+ char query[1024];
+ plugin_t *newplugin;
+ len = ssnprintf (query, sizeof (query), "SELECT id FROM plugin WHERE name = '%s'", pluginname);
+ DEBUG("write_mysql plugin: %s", query);
+ pthread_mutex_lock(&mutex);
+ mysql_real_query(conn, query, len);
+ result = mysql_store_result(conn);
+ if (row = mysql_fetch_row(result)) {
+ id = atoi(row[0]);
+ mysql_free_result(result);
+ pthread_mutex_unlock(&mutex);
+ } else {
+ mysql_free_result(result);
+ len = ssnprintf (query, sizeof (query), "INSERT INTO plugin (name) VALUES ('%s')", pluginname);
+ DEBUG("write_mysql plugin: %s", query);
+ mysql_real_query(conn, query, len);
+ id = (int)mysql_insert_id(conn);
+ }
+ newplugin = malloc(sizeof(plugin_t));
+ sstrncpy (newplugin->name, pluginname, sizeof(newplugin->name));
+ newplugin->id = id;
+ newplugin->next_plugin = NULL;
+ if (plugin_list == NULL) {
+ plugin_list = newplugin;
+ } else {
+ plugin_t *plugin = plugin_list;
+ while (plugin->next_plugin != NULL) {
+ plugin = plugin->next_plugin;
+ }
+ plugin->next_plugin = newplugin;
+ }
+ pthread_mutex_unlock(&mutex);
+ return id;
+}
+
+static int add_type_id (char *typename) {
+ int id, len;
+ id = '\0';
+ MYSQL_RES *result;
+ MYSQL_ROW row;
+ int num_fields;
+ char query[1024];
+ type_t *newtype;
+ len = ssnprintf (query, sizeof (query), "SELECT id FROM type WHERE name = '%s'", typename);
+ DEBUG("write_mysql plugin: %s", query);
+ pthread_mutex_lock(&mutex);
+ mysql_real_query(conn, query, len);
+ result = mysql_store_result(conn);
+ if (row = mysql_fetch_row(result)) {
+ id = atoi(row[0]);
+ mysql_free_result(result);
+ pthread_mutex_unlock(&mutex);
+ } else {
+ mysql_free_result(result);
+ len = ssnprintf (query, sizeof (query), "INSERT INTO type (name) VALUES ('%s')", typename);
+ DEBUG("write_mysql plugin: %s", query);
+ mysql_real_query(conn, query, len);
+ id = (int)mysql_insert_id(conn);
+ }
+ newtype = malloc(sizeof(type_t));
+ sstrncpy (newtype->name, typename, sizeof(newtype->name));
+ newtype->id = id;
+ newtype->next_type = NULL;
+ if (type_list == NULL) {
+ type_list = newtype;
+ } else {
+ type_t *type = type_list;
+ while (type->next_type != NULL) {
+ type = type->next_type;
+ }
+ type->next_type = newtype;
+ }
+ pthread_mutex_unlock(&mutex);
+ return id;
+}
+
+static int add_dataset_id (data_source_t *ds, int type_id) {
+ int id, len;
+ id = '\0';
+ MYSQL_RES *result;
+ MYSQL_ROW row;
+ int num_fields;
+ char query[1024];
+ char *type;
+ dataset_t *newdataset;
+ len = ssnprintf (query, sizeof (query), "SELECT id FROM dataset WHERE name = '%s' AND type_id = '%d'", ds->name, type_id);
+ DEBUG("write_mysql plugin: %s", query);
+ pthread_mutex_lock(&mutex);
+ mysql_real_query(conn, query, len);
+ result = mysql_store_result(conn);
+ if (row = mysql_fetch_row(result)) {
+ id = atoi(row[0]);
+ mysql_free_result(result);
+ pthread_mutex_unlock(&mutex);
+ } else {
+ mysql_free_result(result);
+ switch(ds->type) {
+ case DS_TYPE_COUNTER:
+ type = "COUNTER";
+ break;
+ case DS_TYPE_DERIVE:
+ type = "DERIVE";
+ break;
+ case DS_TYPE_ABSOLUTE:
+ type = "ABSOLUTE";
+ break;
+ default:
+ type = "GAUGE";
+ break;
+ }
+ len = ssnprintf (query, sizeof (query), "INSERT INTO dataset (name,type_id,type,min,max) VALUES ('%s','%d','%s','%f','%f')", ds->name, type_id, type, ds->min, ds->max);
+ DEBUG("write_mysql plugin: %s", query);
+ mysql_real_query(conn, query, len);
+ id = (int)mysql_insert_id(conn);
+ }
+ newdataset = malloc(sizeof(dataset_t));
+ sstrncpy (newdataset->name, ds->name, sizeof(newdataset->name));
+ newdataset->id = id;
+ newdataset->type_id = type_id;
+ newdataset->next_dataset = NULL;
+ if (dataset_list == NULL) {
+ dataset_list = newdataset;
+ } else {
+ dataset_t *dataset = dataset_list;
+ while (dataset->next_dataset != NULL) {
+ dataset = dataset->next_dataset;
+ }
+ dataset->next_dataset = newdataset;
+ }
+ pthread_mutex_unlock(&mutex);
+ return id;
+}
+
+static int get_host_id (char *hostname) {
+ int id;
+ id = '\0';
+ host_t *host = host_list;
+ while (host) {
+ if (strcmp (hostname, host->name) == 0) {
+ id = host->id;
+ break;
+ }
+ host = host->next_host;
+ }
+ if (!id) {
+ id = add_host_id(hostname);
+ }
+ return (id);
+}
+
+static int get_plugin_id (char *pluginname) {
+ int id;
+ id = '\0';
+ plugin_t *plugin = plugin_list;
+ while (plugin) {
+ if (strcmp (pluginname, plugin->name) == 0) {
+ id = plugin->id;
+ break;
+ }
+ plugin = plugin->next_plugin;
+ }
+ if (!id) {
+ id = add_plugin_id(pluginname);
+ }
+ return (id);
+}
+
+static int get_type_id (char *typename) {
+ int id;
+ id = '\0';
+ type_t *type = type_list;
+ while (type) {
+ if (strcmp (typename, type->name) == 0) {
+ id = type->id;
+ break;
+ }
+ type = type->next_type;
+ }
+ if (!id) {
+ id = add_type_id(typename);
+ }
+ return (id);
+}
+
+static int get_dataset_id (data_source_t *ds, int type_id) {
+ int id;
+ id = '\0';
+ dataset_t *dataset = dataset_list;
+ while (dataset) {
+ if (strcmp (ds->name, dataset->name) == 0 && dataset->type_id == type_id) {
+ id = dataset->id;
+ break;
+ }
+ dataset = dataset->next_dataset;
+ }
+ if (!id) {
+ id = add_dataset_id(ds, type_id);
+ }
+ return (id);
+}
+
+static int write_mysql_write(const data_set_t *ds, const value_list_t *vl,
+ user_data_t __attribute__((unused)) *user_data) {
+ int i;
+ int host_id, plugin_id, type_id;
+ host_id = get_host_id((char*)vl->host);
+ plugin_id = get_plugin_id((char*)vl->plugin);
+ type_id = get_type_id((char*)vl->type);
+ gauge_t *rates = NULL;
+ for (i = 0; i < ds->ds_num; i++ ) {
+ char tmpquery[1024];
+ char query[1024], chartest;
+ int len;
+ data_source_t *dso = ds->ds + i;
+ int dataset_id = get_dataset_id(dso, type_id);
+ len = ssnprintf (tmpquery, sizeof (tmpquery), "INSERT INTO data "
+ "(timestamp,host_id,plugin_id,plugin_instance,type_id,typeinstance,dataset_id,value)"
+ "VALUES (%.3f,%d,%d,'%s',%d,'%s',%d,%%lf)", CDTIME_T_TO_DOUBLE (vl->time), host_id,
+ plugin_id, vl->plugin_instance, type_id, vl->type_instance, dataset_id );
+ if (dso->type == DS_TYPE_GAUGE) {
+ len = ssnprintf (query, sizeof (query), tmpquery, vl->values[i].gauge);
+ } else {
+ if (rates == NULL) {
+ rates = uc_get_rate (ds, vl);
+ }
+ if (isnan(rates[i])) { continue; }
+ len = ssnprintf (query, sizeof (query), tmpquery, rates[i]);
+ }
+ //INFO("toto: %d", toto);
+ pthread_mutex_lock(&mutex);
+ DEBUG("write_mysql plugin: %s", query);
+ mysql_real_query(conn, query, len);
+ pthread_mutex_unlock(&mutex);
+ }
+ return (0);
+}
+
+static int write_mysql_shutdown(void) {
+ mysql_close(conn);
+}
+
+void module_register (void) {
+ plugin_register_init ("write_mysql", write_mysql_init);
+ plugin_register_config ("write_mysql", write_mysql_config,
+ config_keys, config_keys_num);
+ plugin_register_write ("write_mysql", write_mysql_write, /* user_data = */ NULL);
+ plugin_register_shutdown ("write_mysql", write_mysql_shutdown);
+}
+
LoadPlugin write_mysql
<Plugin write_mysql>
Host "localhost"
User "root"
Passwd ""
Database "collectd"
</Plugin>
-- MySQL dump 10.11
--
-- Host: localhost Database: collectd
-- ------------------------------------------------------
-- Server version 5.0.77
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0
*/;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
--
-- Current Database: `collectd`
--
CREATE DATABASE /*!32312 IF NOT EXISTS*/ `collectd` /*!40100 DEFAULT CHARACTER
SET latin1 */;
USE `collectd`;
--
-- Table structure for table `data`
--
DROP TABLE IF EXISTS `data`;
SET @saved_cs_client = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `data` (
`id` bigint(20) NOT NULL auto_increment,
`timestamp` double NOT NULL,
`host_id` int(11) NOT NULL,
`plugin_id` int(11) NOT NULL,
`plugin_instance` varchar(255) default NULL,
`type_id` int(11) NOT NULL,
`typeinstance` varchar(255) default NULL,
`dataset_id` int(11) NOT NULL,
`value` double NOT NULL,
PRIMARY KEY (`id`),
KEY `timestamp` (`timestamp`),
KEY `host_id` (`host_id`),
KEY `plugin_id` (`plugin_id`),
KEY `type_id` (`type_id`),
KEY `typeinstance_id` (`typeinstance`),
KEY `dataset_id` (`dataset_id`),
CONSTRAINT `data_ibfk_1` FOREIGN KEY (`host_id`) REFERENCES `host` (`id`) ON
DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT `data_ibfk_2` FOREIGN KEY (`plugin_id`) REFERENCES `plugin` (`id`)
ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT `data_ibfk_3` FOREIGN KEY (`type_id`) REFERENCES `type` (`id`) ON
DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT `data_ibfk_4` FOREIGN KEY (`dataset_id`) REFERENCES `dataset`
(`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
--
-- Table structure for table `dataset`
--
DROP TABLE IF EXISTS `dataset`;
SET @saved_cs_client = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `dataset` (
`id` int(11) NOT NULL auto_increment,
`type_id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,
`type` enum('COUNTER','GAUGE','DERIVE','ABSOLUTE') NOT NULL,
`min` double NOT NULL,
`max` double NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`,`type_id`),
KEY `name` (`name`),
KEY `type_id` (`type_id`),
CONSTRAINT `dataset_ibfk_1` FOREIGN KEY (`type_id`) REFERENCES `type` (`id`)
ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
--
-- Table structure for table `host`
--
DROP TABLE IF EXISTS `host`;
SET @saved_cs_client = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `host` (
`id` int(11) NOT NULL auto_increment,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
--
-- Table structure for table `plugin`
--
DROP TABLE IF EXISTS `plugin`;
SET @saved_cs_client = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `plugin` (
`id` int(11) NOT NULL auto_increment,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`),
UNIQUE KEY `id` (`id`,`name`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
--
-- Table structure for table `type`
--
DROP TABLE IF EXISTS `type`;
SET @saved_cs_client = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `type` (
`id` int(11) NOT NULL auto_increment,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `name` (`name`),
UNIQUE KEY `id` (`id`,`name`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
-- Dump completed on 2011-11-07 12:57:15
_______________________________________________
collectd mailing list
[email protected]
http://mailman.verplant.org/listinfo/collectd