Hi all, 

For my needs at work I wrote my second plugin for
collectd : write_mysql

I hope it make you more enthusiastic than my
first plugin
(http://mailman.verplant.org/pipermail/collectd/2011-October/004766.html
[1]).

This plugin allow you to write data into MySQL in a relational
scheme.

In attachment :

- collectd-write_mysql.patch (The patch)

-
write_mysql.conf (conf file for collectd)

- mysql-collectd-scheme.sql
(database and tables creation SQL)

Regards,

Cyril Feraudet

 


Links:
------
[1]
http://mailman.verplant.org/pipermail/collectd/2011-October/004766.html
--- ../collectd-5.0.0/configure.in	2011-03-27 18:35:08.000000000 +0200
+++ ./configure.in	2011-11-07 13:38:55.000000000 +0100
@@ -2242,7 +2242,7 @@
 fi
 if test "x$with_libmysql" = "xyes"
 then
-	with_mysql_libs=`$with_mysql_config --libs 2>/dev/null`
+	with_mysql_libs=`$with_mysql_config --libs_r 2>/dev/null`
 	mysql_config_status=$?
 
 	if test $mysql_config_status -ne 0
@@ -4713,6 +4713,7 @@
 AC_PLUGIN([vserver],     [$plugin_vserver],    [Linux VServer statistics])
 AC_PLUGIN([wireless],    [$plugin_wireless],   [Wireless statistics])
 AC_PLUGIN([write_http],  [$with_libcurl],      [HTTP output plugin])
+AC_PLUGIN([write_mysql], [$with_libmysql],     [MySQL output plugin])
 AC_PLUGIN([write_redis], [$with_libcredis],    [Redis output plugin])
 AC_PLUGIN([xmms],        [$with_libxmms],      [XMMS statistics])
 AC_PLUGIN([zfs_arc],     [$plugin_zfs_arc],    [ZFS ARC statistics])
@@ -5039,6 +5040,7 @@
     vserver . . . . . . . $enable_vserver
     wireless  . . . . . . $enable_wireless
     write_http  . . . . . $enable_write_http
+    write_mysql . . . . . $enable_write_mysql
     write_redis . . . . . $enable_write_redis
     xmms  . . . . . . . . $enable_xmms
     zfs_arc . . . . . . . $enable_zfs_arc
--- ../collectd-5.0.0/src/Makefile.am	2011-03-27 18:35:08.000000000 +0200
+++ ./src/Makefile.am	2011-11-07 13:40:01.000000000 +0100
@@ -259,6 +259,18 @@
 collectd_DEPENDENCIES += csv.la
 endif
 
+if BUILD_PLUGIN_WRITE_MYSQL
+pkglib_LTLIBRARIES += write_mysql.la
+write_mysql_la_SOURCES = write_mysql.c
+write_mysql_la_LDFLAGS = -module -avoid-version
+collectd_LDADD += "-dlopen" write_mysql.la
+if BUILD_WITH_LIBMYSQL
+write_mysql_la_CFLAGS = $(BUILD_WITH_LIBMYSQL_CFLAGS)
+write_mysql_la_LIBADD = $(BUILD_WITH_LIBMYSQL_LIBS) -lmysqlclient_r
+endif
+collectd_DEPENDENCIES += write_mysql.la
+endif
+
 if BUILD_PLUGIN_CURL
 pkglib_LTLIBRARIES += curl.la
 curl_la_SOURCES = curl.c
--- ../collectd-5.0.0/src/write_mysql.c	1970-01-01 01:00:00.000000000 +0100
+++ ./src/write_mysql.c	2011-11-07 13:34:41.000000000 +0100
@@ -0,0 +1,391 @@
+#include "collectd.h"
+#include "plugin.h"
+#include "common.h"
+#ifdef HAVE_MYSQL_H
+#include <mysql.h>
+#elif defined(HAVE_MYSQL_MYSQL_H)
+#include <mysql/mysql.h>
+#endif
+#include <pthread.h>
+
+typedef struct host_s host_t;
+struct host_s {
+	char	name[DATA_MAX_NAME_LEN];
+	int	id;
+	host_t	*next_host;
+};
+
+typedef struct plugin_s plugin_t;
+struct plugin_s {
+	char	name[DATA_MAX_NAME_LEN];
+	int	id;
+	plugin_t	*next_plugin;
+};
+
+typedef struct type_s type_t;
+struct type_s {
+	char	name[DATA_MAX_NAME_LEN];
+	int	id;
+	type_t	*next_type;
+};
+
+typedef struct dataset_s dataset_t;
+struct dataset_s {
+	char	name[DATA_MAX_NAME_LEN];
+	int	id;
+	int	type_id;
+	dataset_t	*next_dataset;
+};
+
+
+static const char *config_keys[] =
+{
+	"Host",
+	"User",
+	"Passwd",
+	"Database",
+	"Port"
+};
+static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
+
+char *host = "localhost";
+char *user = "root";
+char *passwd = "";
+char *database = "collectd";
+int port = 0;
+
+host_t *host_list = NULL;
+plugin_t *plugin_list = NULL;
+type_t *type_list = NULL;
+dataset_t *dataset_list = NULL;
+MYSQL *conn;
+pthread_mutex_t mutex;
+
+static int write_mysql_config (const char *key, const char *value) {
+	if (strcasecmp ("Host", key) == 0) {
+		host = strdup (value);
+	} else if (strcasecmp ("User", key) == 0) {
+		user = strdup (value);
+	} else if (strcasecmp ("Passwd", key) == 0) {
+		passwd = strdup (value);
+	} else if (strcasecmp ("Database", key) == 0) {
+		database = strdup (value);
+	} else if (strcasecmp ("Port", key) == 0) {
+		port = value;
+	}
+}
+
+static int write_mysql_init (void) {
+	conn = mysql_init(NULL);
+	if (mysql_real_connect(conn, host, user, passwd, database, port, NULL, 0) == NULL) {
+		ERROR("write_mysql plugin: Failed to connect to database %s "
+			" at server %s with user %s : %s", database, host, user, mysql_error (conn));
+		plugin_unregister_write ("write_mysql");
+		return (-1);
+	}
+	if (!mysql_thread_safe()) {
+		ERROR("write_mysql plugin: mysqlclient Thread Safe OFF");
+		return (-1);
+	} else {
+		DEBUG("write_mysql plugin: mysqlclient Thread Safe ON");
+	}
+	return (0);
+}
+
+
+static int add_host_id (char *hostname) {
+	int id, len;
+	id = '\0';
+	MYSQL_RES *result;
+	MYSQL_ROW row;
+	int num_fields;
+	char query[1024];
+	host_t *newhost;
+	len = ssnprintf (query, sizeof (query), "SELECT id FROM host WHERE name = '%s'", hostname);
+	DEBUG("write_mysql plugin: %s", query);
+	pthread_mutex_lock(&mutex);
+	mysql_real_query(conn, query, len);
+	result = mysql_store_result(conn);
+	if (row = mysql_fetch_row(result)) {
+		id = atoi(row[0]);
+		mysql_free_result(result);
+		pthread_mutex_unlock(&mutex);
+	} else {
+		mysql_free_result(result);
+		len = ssnprintf (query, sizeof (query), "INSERT INTO host (name) VALUES ('%s')", hostname);
+		DEBUG("write_mysql plugin: %s", query);
+		mysql_real_query(conn, query, len);
+		id = (int)mysql_insert_id(conn);
+	}
+	newhost = malloc(sizeof(host_t));
+	sstrncpy (newhost->name, hostname, sizeof(newhost->name));
+	newhost->id = id;
+	newhost->next_host = NULL;
+	if (host_list == NULL) {
+		host_list = newhost;
+	} else {
+		host_t *host = host_list;
+		while (host->next_host != NULL) {
+			host = host->next_host;
+		}
+		host->next_host = newhost;
+	}
+	pthread_mutex_unlock(&mutex);
+	return id;
+}
+		
+static int add_plugin_id (char *pluginname) {
+	int id, len;
+	id = '\0';
+	MYSQL_RES *result;
+	MYSQL_ROW row;
+	int num_fields;
+	char query[1024];
+	plugin_t *newplugin;
+	len = ssnprintf (query, sizeof (query), "SELECT id FROM plugin WHERE name = '%s'", pluginname);
+	DEBUG("write_mysql plugin: %s", query);
+	pthread_mutex_lock(&mutex);
+	mysql_real_query(conn, query, len);
+	result = mysql_store_result(conn);
+	if (row = mysql_fetch_row(result)) {
+		id = atoi(row[0]);
+		mysql_free_result(result);
+		pthread_mutex_unlock(&mutex);
+	} else {
+		mysql_free_result(result);
+		len = ssnprintf (query, sizeof (query), "INSERT INTO plugin (name) VALUES ('%s')", pluginname);
+		DEBUG("write_mysql plugin: %s", query);
+		mysql_real_query(conn, query, len);
+		id = (int)mysql_insert_id(conn);
+	}
+	newplugin = malloc(sizeof(plugin_t));
+	sstrncpy (newplugin->name, pluginname, sizeof(newplugin->name));
+	newplugin->id = id;
+	newplugin->next_plugin = NULL;
+	if (plugin_list == NULL) {
+		plugin_list = newplugin;
+	} else {
+		plugin_t *plugin = plugin_list;
+		while (plugin->next_plugin != NULL) {
+			plugin = plugin->next_plugin;
+		}
+		plugin->next_plugin = newplugin;
+	}
+	pthread_mutex_unlock(&mutex);
+	return id;
+}
+		
+static int add_type_id (char *typename) {
+	int id, len;
+	id = '\0';
+	MYSQL_RES *result;
+	MYSQL_ROW row;
+	int num_fields;
+	char query[1024];
+	type_t *newtype;
+	len = ssnprintf (query, sizeof (query), "SELECT id FROM type WHERE name = '%s'", typename);
+	DEBUG("write_mysql plugin: %s", query);
+	pthread_mutex_lock(&mutex);
+	mysql_real_query(conn, query, len);
+	result = mysql_store_result(conn);
+	if (row = mysql_fetch_row(result)) {
+		id = atoi(row[0]);
+		mysql_free_result(result);
+		pthread_mutex_unlock(&mutex);
+	} else {
+		mysql_free_result(result);
+		len = ssnprintf (query, sizeof (query), "INSERT INTO type (name) VALUES ('%s')", typename);
+		DEBUG("write_mysql plugin: %s", query);
+		mysql_real_query(conn, query, len);
+		id = (int)mysql_insert_id(conn);
+	}
+	newtype = malloc(sizeof(type_t));
+	sstrncpy (newtype->name, typename, sizeof(newtype->name));
+	newtype->id = id;
+	newtype->next_type = NULL;
+	if (type_list == NULL) {
+		type_list = newtype;
+	} else {
+		type_t *type = type_list;
+		while (type->next_type != NULL) {
+			type = type->next_type;
+		}
+		type->next_type = newtype;
+	}
+	pthread_mutex_unlock(&mutex);
+	return id;
+}
+		
+static int add_dataset_id (data_source_t *ds, int type_id) {
+	int id, len;
+	id = '\0';
+	MYSQL_RES *result;
+	MYSQL_ROW row;
+	int num_fields;
+	char query[1024];
+	char *type;
+	dataset_t *newdataset;
+	len = ssnprintf (query, sizeof (query), "SELECT id FROM dataset WHERE name = '%s' AND type_id = '%d'", ds->name, type_id);
+	DEBUG("write_mysql plugin: %s", query);
+	pthread_mutex_lock(&mutex);
+	mysql_real_query(conn, query, len);
+	result = mysql_store_result(conn);
+	if (row = mysql_fetch_row(result)) {
+		id = atoi(row[0]);
+		mysql_free_result(result);
+		pthread_mutex_unlock(&mutex);
+	} else {
+		mysql_free_result(result);
+		switch(ds->type) {
+			case DS_TYPE_COUNTER:
+				type = "COUNTER";
+			break;
+			case DS_TYPE_DERIVE:
+				type = "DERIVE";
+			break;
+			case DS_TYPE_ABSOLUTE:
+				type = "ABSOLUTE";
+			break;
+			default:
+				type = "GAUGE";
+			break;
+		}
+		len = ssnprintf (query, sizeof (query), "INSERT INTO dataset (name,type_id,type,min,max) VALUES ('%s','%d','%s','%f','%f')", ds->name, type_id, type, ds->min, ds->max);
+		DEBUG("write_mysql plugin: %s", query);
+		mysql_real_query(conn, query, len);
+		id = (int)mysql_insert_id(conn);
+	}
+	newdataset = malloc(sizeof(dataset_t));
+	sstrncpy (newdataset->name, ds->name, sizeof(newdataset->name));
+	newdataset->id = id;
+	newdataset->type_id = type_id;
+	newdataset->next_dataset = NULL;
+	if (dataset_list == NULL) {
+		dataset_list = newdataset;
+	} else {
+		dataset_t *dataset = dataset_list;
+		while (dataset->next_dataset != NULL) {
+			dataset = dataset->next_dataset;
+		}
+		dataset->next_dataset = newdataset;
+	}
+	pthread_mutex_unlock(&mutex);
+	return id;
+}
+		
+static int get_host_id (char *hostname) {
+	int id;
+	id = '\0';
+	host_t *host = host_list;
+	while (host) {
+		if (strcmp (hostname, host->name) == 0) {
+			id = host->id;
+			break;
+		}
+		host = host->next_host;
+	}
+	if (!id) {
+		id = add_host_id(hostname);
+	}
+	return (id);
+}
+
+static int get_plugin_id (char *pluginname) {
+	int id;
+	id = '\0';
+	plugin_t *plugin = plugin_list;
+	while (plugin) {
+		if (strcmp (pluginname, plugin->name) == 0) {
+			id = plugin->id;
+			break;
+		}
+		plugin = plugin->next_plugin;
+	}
+	if (!id) {
+		id = add_plugin_id(pluginname);
+	}
+	return (id);
+}
+
+static int get_type_id (char *typename) {
+	int id;
+	id = '\0';
+	type_t *type = type_list;
+	while (type) {
+		if (strcmp (typename, type->name) == 0) {
+			id = type->id;
+			break;
+		}
+		type = type->next_type;
+	}
+	if (!id) {
+		id = add_type_id(typename);
+	}
+	return (id);
+}
+
+static int get_dataset_id (data_source_t *ds, int type_id) {
+	int id;
+	id = '\0';
+	dataset_t *dataset = dataset_list;
+	while (dataset) {
+		if (strcmp (ds->name, dataset->name) == 0 && dataset->type_id == type_id) {
+			id = dataset->id;
+			break;
+		}
+		dataset = dataset->next_dataset;
+	}
+	if (!id) {
+		id = add_dataset_id(ds, type_id);
+	}
+	return (id);
+}
+
+static int write_mysql_write(const data_set_t *ds, const value_list_t *vl,
+		user_data_t __attribute__((unused)) *user_data) {
+	int i;
+	int host_id, plugin_id, type_id;
+	host_id = get_host_id((char*)vl->host);
+	plugin_id = get_plugin_id((char*)vl->plugin);
+	type_id = get_type_id((char*)vl->type);
+	gauge_t *rates = NULL;
+	for (i = 0; i < ds->ds_num; i++ ) {
+		char tmpquery[1024];
+		char query[1024], chartest;
+		int len;
+		data_source_t *dso = ds->ds + i;
+		int dataset_id = get_dataset_id(dso, type_id);
+		len = ssnprintf (tmpquery, sizeof (tmpquery), "INSERT INTO data "
+		"(timestamp,host_id,plugin_id,plugin_instance,type_id,typeinstance,dataset_id,value)"
+		"VALUES (%.3f,%d,%d,'%s',%d,'%s',%d,%%lf)", CDTIME_T_TO_DOUBLE (vl->time), host_id, 
+		plugin_id, vl->plugin_instance, type_id, vl->type_instance, dataset_id );
+		if (dso->type == DS_TYPE_GAUGE) {
+			len = ssnprintf (query, sizeof (query), tmpquery, vl->values[i].gauge);
+		} else {
+			if (rates == NULL) {
+				rates = uc_get_rate (ds, vl);
+			}
+			if (isnan(rates[i])) { continue; }
+			len = ssnprintf (query, sizeof (query), tmpquery, rates[i]);
+		}
+		//INFO("toto: %d", toto);
+		pthread_mutex_lock(&mutex);
+		DEBUG("write_mysql plugin: %s", query);
+		mysql_real_query(conn, query, len);
+		pthread_mutex_unlock(&mutex);
+	}
+	return (0);
+}
+
+static int write_mysql_shutdown(void) {
+	mysql_close(conn);
+}
+
+void module_register (void) {
+	plugin_register_init ("write_mysql", write_mysql_init);
+	plugin_register_config ("write_mysql", write_mysql_config,
+			config_keys, config_keys_num);
+	plugin_register_write ("write_mysql", write_mysql_write, /* user_data = */ NULL);
+	plugin_register_shutdown ("write_mysql", write_mysql_shutdown);
+}
+
LoadPlugin write_mysql

<Plugin write_mysql>
        Host            "localhost"
        User            "root"
        Passwd          ""
        Database        "collectd"
</Plugin>


-- MySQL dump 10.11
--
-- Host: localhost    Database: collectd
-- ------------------------------------------------------
-- Server version       5.0.77

/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;
/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */;
/*!40103 SET TIME_ZONE='+00:00' */;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 
*/;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;

--
-- Current Database: `collectd`
--

CREATE DATABASE /*!32312 IF NOT EXISTS*/ `collectd` /*!40100 DEFAULT CHARACTER 
SET latin1 */;

USE `collectd`;

--
-- Table structure for table `data`
--

DROP TABLE IF EXISTS `data`;
SET @saved_cs_client     = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `data` (
  `id` bigint(20) NOT NULL auto_increment,
  `timestamp` double NOT NULL,
  `host_id` int(11) NOT NULL,
  `plugin_id` int(11) NOT NULL,
  `plugin_instance` varchar(255) default NULL,
  `type_id` int(11) NOT NULL,
  `typeinstance` varchar(255) default NULL,
  `dataset_id` int(11) NOT NULL,
  `value` double NOT NULL,
  PRIMARY KEY  (`id`),
  KEY `timestamp` (`timestamp`),
  KEY `host_id` (`host_id`),
  KEY `plugin_id` (`plugin_id`),
  KEY `type_id` (`type_id`),
  KEY `typeinstance_id` (`typeinstance`),
  KEY `dataset_id` (`dataset_id`),
  CONSTRAINT `data_ibfk_1` FOREIGN KEY (`host_id`) REFERENCES `host` (`id`) ON 
DELETE CASCADE ON UPDATE CASCADE,
  CONSTRAINT `data_ibfk_2` FOREIGN KEY (`plugin_id`) REFERENCES `plugin` (`id`) 
ON DELETE CASCADE ON UPDATE CASCADE,
  CONSTRAINT `data_ibfk_3` FOREIGN KEY (`type_id`) REFERENCES `type` (`id`) ON 
DELETE CASCADE ON UPDATE CASCADE,
  CONSTRAINT `data_ibfk_4` FOREIGN KEY (`dataset_id`) REFERENCES `dataset` 
(`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=51 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;

--
-- Table structure for table `dataset`
--

DROP TABLE IF EXISTS `dataset`;
SET @saved_cs_client     = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `dataset` (
  `id` int(11) NOT NULL auto_increment,
  `type_id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  `type` enum('COUNTER','GAUGE','DERIVE','ABSOLUTE') NOT NULL,
  `min` double NOT NULL,
  `max` double NOT NULL,
  PRIMARY KEY  (`id`),
  UNIQUE KEY `id` (`id`,`type_id`),
  KEY `name` (`name`),
  KEY `type_id` (`type_id`),
  CONSTRAINT `dataset_ibfk_1` FOREIGN KEY (`type_id`) REFERENCES `type` (`id`) 
ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB AUTO_INCREMENT=13 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;

--
-- Table structure for table `host`
--

DROP TABLE IF EXISTS `host`;
SET @saved_cs_client     = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `host` (
  `id` int(11) NOT NULL auto_increment,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY  (`id`),
  UNIQUE KEY `name` (`name`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;

--
-- Table structure for table `plugin`
--

DROP TABLE IF EXISTS `plugin`;
SET @saved_cs_client     = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `plugin` (
  `id` int(11) NOT NULL auto_increment,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY  (`id`),
  UNIQUE KEY `name` (`name`),
  UNIQUE KEY `id` (`id`,`name`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;

--
-- Table structure for table `type`
--

DROP TABLE IF EXISTS `type`;
SET @saved_cs_client     = @@character_set_client;
SET character_set_client = utf8;
CREATE TABLE `type` (
  `id` int(11) NOT NULL auto_increment,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY  (`id`),
  UNIQUE KEY `name` (`name`),
  UNIQUE KEY `id` (`id`,`name`)
) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
SET character_set_client = @saved_cs_client;
/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */;

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

-- Dump completed on 2011-11-07 12:57:15
_______________________________________________
collectd mailing list
[email protected]
http://mailman.verplant.org/listinfo/collectd

Reply via email to