Hi,
This continues on from my last patch for prepared statements.
The patch is for flow-exports to use bulk updates. I have been
talking with the helpful people on the mysql irc chan. Once
I had wrote this, what use to take 80min to load into a the
database now takes 20min. It has a huge and very noticeable
improvement.
Compare format3 with this format6 (careful of the differing
field types)
There is a #define you can tweak to try and get the most out of
this patch BULK_INSERT_COUNT
Too small and you spend your life as before iterating a loop,
too big and you may exceed some (net) io (fragmenting the data).
I have yet to determine the optimum value for my system, if you
have any ideas let me know.
Thorben
Note, this depends on the previous patches
--- a/src/flow-export.c 2005-10-11 08:56:28.000000000 +0100
+++ b/src/flow-export.c 2005-10-11 08:59:09.000000000 +0100
@@ -58,8 +58,12 @@
#define DB_DEFAULT_DBUSER "netflow"
#define DB_DEFAULT_DBPWD "netflow"
+/* The most fields in a flow */
#define MAX_FIELD_COUNT 33
+/* Tune this to your needs for bast speed*/
+#define BULK_INSERT_COUNT 100
+
int fmt_xfields_bind(u_int64 xfield, MYSQL_BIND *bind, struct fts3rec_all *cur);
#endif /* MYSQL */
@@ -92,6 +96,7 @@
struct options {
char dbaseURI[256];
char dbtag[128];
+ int dblock;
u_int32 cflowd_mask;
u_int64 ft_mask;
u_long records;
@@ -142,11 +147,12 @@
opt.cflowd_mask = 0xFFFFFFFFL;
opt.ft_mask = 0xFFFFFFFFFFFFFFFFLL;
+ opt.dblock = 0;
/* profile */
ftprof_start (&ftp);
- while ((i = getopt(argc, argv, "h?d:f:m:t:u:")) != -1)
+ while ((i = getopt(argc, argv, "h?d:f:lm:t:u:")) != -1)
switch (i) {
@@ -164,6 +170,10 @@
exit (0);
break;
+ case 'l':
+ opt.dblock = 1;
+ break;
+
case 'm': /* cflowd mask */
if (isalpha((int)optarg[0])) {
ascii_mask = 1;
@@ -983,19 +993,19 @@
#ifdef HAVE_MYSQL
struct fts3rec_offsets fo;
struct ftver ftv;
- struct fts3rec_all cur;
- char fields[1024], query[3*1024];
+ struct fts3rec_all flowrec[BULK_INSERT_COUNT];
+ char fields[1024], query[BULK_INSERT_COUNT*MAX_FIELD_COUNT*2+1024], query_v[128];
char *rec;
char *db_host, *db_name, *db_table, *db_user, *db_pwd, *db_tmp, *tmp;
int db_port;
- int len,count,i;
+ int len,count,i,j;
MYSQL *mysql;
MYSQL_STMT *stmt;
- MYSQL_BIND bind[MAX_FIELD_COUNT+1];
+ MYSQL_BIND bind[BULK_INSERT_COUNT*(MAX_FIELD_COUNT+1)];
char *tag_field, *tag_value;
- int tag_length;
+ unsigned long tag_length;
db_host = DB_DEFAULT_DBHOST;
db_name = DB_DEFAULT_DBNAME;
@@ -1018,13 +1028,11 @@
db_table = strsep(&tmp, ":");
db_port = atoi(db_tmp);
- if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table) {
- fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
- return -1;
- }
+ if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table)
+ fterr_errx(1,"Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
} /* dbaseURI */
-
+
ftio_get_ver(ftio, &ftv);
fts3rec_compute_offsets(&fo, &ftv);
@@ -1034,37 +1042,44 @@
/* generate the field names once */
len = fmt_xfields_type(fields, opt->ft_mask);
-
- memset(bind, 0, sizeof(bind));
- count = fmt_xfields_bind(opt->ft_mask, bind, &cur);
/* parse tag string*/
+ tag_field = NULL;
if (strlen(opt->dbtag)) {
tmp = opt->dbtag;
tag_field = strsep(&tmp, "=");
tag_value = tmp;
- if (!tag_field || !tag_value) {
- fterr_warnx("DB Tag format error, expecting Field=Value.");
- return -1;
- }
+ if (!tag_field || !tag_value)
+ fterr_errx(1,"DB Tag format error, expecting Field=Value.");
- if ((len + strlen(tag_field)+1) >= sizeof(fields)) {
- fterr_warnx("Buffer too small for tag field.");
- return -1;
- }
+ if ((len + strlen(tag_field)+1) >= sizeof(fields))
+ fterr_errx(1,"Buffer too small for tag field.");
strcat(fields, ",");
strcat(fields, tag_field);
tag_length = strlen(tag_value);
- bind[count].buffer_type= MYSQL_TYPE_STRING;
- bind[count].buffer = tag_value;
- bind[count].buffer_length = tag_length;
- bind[count].is_null= 0;
- bind[count].length = &tag_length;
- count++;
+ }
+
+ /*bind flowrec fields to mysql */
+ i = 0;
+ memset(bind, 0, sizeof(bind));
+ for (j=0; j<BULK_INSERT_COUNT; j++) {
+
+ i += fmt_xfields_bind(opt->ft_mask, &bind[i], &flowrec[j]);
+
+ if (tag_field) {
+ bind[i].buffer_type = MYSQL_TYPE_STRING;
+ bind[i].buffer = tag_value;
+ bind[i].buffer_length = tag_length;
+ bind[i].is_null= 0;
+ bind[i].length = &tag_length;
+ i++;
+ }
+
+ if (0==j) count = i;
}
@@ -1073,48 +1088,89 @@
if (!(mysql = mysql_init(mysql)))
fterr_errx(1, "mysql_init(): failed");
- if (mysql_options(mysql, MYSQL_READ_DEFAULT_GROUP, "simple"))
- fterr_errx(1, "mysql_options(): %s", mysql_error(mysql));
-
- if (mysql_real_connect(mysql, db_host, db_user, db_pwd,
- db_name, db_port, NULL, 0) == NULL)
+ if (mysql_real_connect(mysql, db_host, db_user, db_pwd, db_name, db_port, NULL, 0) == NULL)
fterr_errx(1,"mysql_real_connect(): %s\n", mysql_error(mysql));
if (!(stmt = mysql_stmt_init(mysql)))
- fterr_errx(1, "mysql_stmt_init(): failed");
+ fterr_errx(1, "mysql_stmt_init(): %s\n", mysql_error(mysql));
- query[0] = 0;
- strcat(query, "INSERT INTO ");
- strcat(query, db_table);
- strcat (query, "(");
- strcat (query, fields);
- strcat (query, ") VALUES (");
+
+ /*One set of inserts*/
+ query_v[0] = 0;
+ strcat(query_v, "(");
for (i=0; i<count; i++) {
- strcat(query, "?");
- if (i+1<count) strcat(query, ", ");
+ strcat(query_v, "?");
+ if (i+1<count) strcat(query_v, ",");
}
- strcat (query, ")");
-
+ strcat(query_v, ")");
+
+ /* x BULK_INSERT_COUNT insert query */
+ snprintf(query, sizeof(query), "INSERT INTO %s (%s) VALUES ", db_table, fields);
+ for (i=0; i<BULK_INSERT_COUNT; i++) {
+ strncat(query, query_v, sizeof(query) - strlen(query));
+ if (i+1<BULK_INSERT_COUNT) strncat(query, ",", sizeof(query) - strlen(query));
+ }
+
if (mysql_stmt_prepare(stmt, query, strlen(query)))
fterr_errx(1,"mysql_stmt_prepare(): %s\n", mysql_stmt_error(stmt));
if (mysql_stmt_bind_param(stmt, bind))
fterr_errx(1,"mysql_stmt_bind_param(): %s\n", mysql_stmt_error(stmt));
-
+
+ if (opt->dblock) {
+ /*Get a write lock. speed up MyISAM inserts, not best for InnoDB*/
+ snprintf(query, sizeof(query), "LOCK TABLE %s WRITE;", db_table);
+ if (mysql_real_query(mysql, query, strlen(query)) != 0)
+ fterr_warnx("mysql_real_query(): %s", mysql_error(mysql));
+ }
+
/* foreach flow */
+ count = 0;
while ((rec = ftio_read(ftio))) {
+
+ fill_rec_all(opt->ft_mask, &flowrec[count], rec, &fo);
- fill_rec_all(opt->ft_mask, &cur, rec, &fo);
+ if (BULK_INSERT_COUNT-1 == count) {
- if (mysql_stmt_execute(stmt))
- fterr_warnx("mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
+ if (mysql_stmt_execute(stmt))
+ fterr_errx(1,"mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
+ count = 0;
+ }
+ else count++;
++opt->records;
} /* while */
- /* close database */
+ /* insert remainder */
+ if (0 < count) {
+
+ snprintf(query, sizeof(query), "INSERT INTO %s (%s) VALUES ", db_table, fields);
+ for (i=0; i<count; i++) {
+ strncat(query, query_v, sizeof(query) - strlen(query));
+ if (i+1<count) strncat(query, ",", sizeof(query) - strlen(query));
+ }
+
+ if (mysql_stmt_prepare(stmt, query, strlen(query)))
+ fterr_errx(1,"mysql_stmt_prepare(): %s\n", mysql_stmt_error(stmt));
+
+ if (mysql_stmt_bind_param(stmt, bind))
+ fterr_errx(1,"mysql_stmt_bind_param(): %s\n", mysql_stmt_error(stmt));
+
+ if (mysql_stmt_execute(stmt))
+ fterr_errx(1,"mysql_stmt_execute(): %s", mysql_stmt_error(stmt));
+
+ }
+
+ if (opt->dblock) {
+ /*Release our locks*/
+ snprintf(query, sizeof(query), "UNLOCK TABLES;");
+ if (mysql_real_query(mysql, query, strlen(query)) != 0)
+ fterr_warnx("mysql_real_query(): %s", mysql_error(mysql));
+ }
+
+ /* close database */
mysql_stmt_close(stmt);
mysql_close(mysql);
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools