more than 100 times faster, as for me :)
+ ' quotation instead of "
+ error with db_port in format 5
patch is for version 0.67 but works with 0.68 too
Alexey Bestchekov
--- flow-tools-0.67_orig/src/flow-export.c 2005-11-24 01:51:39.000000000 +0300
+++ flow-tools-0.67/src/flow-export.c 2005-11-24 01:47:57.000000000 +0300
@@ -64,12 +64,12 @@
#include <libpq-fe.h>
-#define DB_DEFAULT_DBHOST "localhost"
-#define DB_DEFAULT_DBNAME "netflow"
-#define DB_DEFAULT_DBPORT "5432"
-#define DB_DEFAULT_DBTABLE "raw"
-#define DB_DEFAULT_DBUSER "netflow"
-#define DB_DEFAULT_DBPWD "netflow"
+#define DB_PG_DEFAULT_DBHOST "localhost"
+#define DB_PG_DEFAULT_DBNAME "netflow"
+#define DB_PG_DEFAULT_DBPORT "5432"
+#define DB_PG_DEFAULT_DBTABLE "raw"
+#define DB_PG_DEFAULT_DBUSER "netflow"
+#define DB_PG_DEFAULT_DBPWD "netflow"
#endif /* PGSQL*/
@@ -103,18 +104,19 @@
int format3(struct ftio *ftio, struct options *opt);
int format4(struct ftio *ftio, struct options *opt);
int format5(struct ftio *ftio, struct options *opt);
+int format6(struct ftio *ftio, struct options *opt);
int ftxfield_tocflow(u_int64 xfields, u_int32 *cfmask);
int fmt_xfields_val(char *fmt_buf, char *rec, struct fts3rec_offsets *fo,
- u_int64 xfields, int quote);
+ u_int64 xfields, int quote, char comma_char );
int fmt_xfields_type(char *buf, u_int64 xfield);
void usage(void);
-#define NFORMATS 6 /* nformats - 1 */
+#define NFORMATS 7 /* nformats - 1 */
struct jump format[] = {{format0}, {format1}, {format2}, {format3},
- {format4}, {format5}};
+ {format4}, {format5}, {format6}};
int main(int argc, char **argv)
{
@@ -663,7 +665,7 @@
while ((rec = ftio_read(ftio))) {
- len = fmt_xfields_val(fmt_buf, rec, &fo, opt->ft_mask, 0);
+ len = fmt_xfields_val(fmt_buf, rec, &fo, opt->ft_mask, 0, ',');
if (len)
printf("%s\n", fmt_buf);
@@ -746,7 +748,7 @@
/* foreach flow */
while ((rec = ftio_read(ftio))) {
- len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1);
+ len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1, ',');
/* form SQL query and execute it */
if (len) {
@@ -864,19 +866,19 @@
struct ftver ftv;
char fields[1024], values[1024], query[3*1024];
char *rec;
- char *db_host, *db_name, *db_table, *db_user, *db_pwd, *db_tmp, *tmp;
+ char *db_host, *db_name, *db_table, *db_user, *db_pwd, *tmp;
char *db_port;
int len;
PGconn *conn;
PGresult *res;
- db_host = DB_DEFAULT_DBHOST;
- db_name = DB_DEFAULT_DBNAME;
- db_port = DB_DEFAULT_DBPORT;
- db_user = DB_DEFAULT_DBUSER;
- db_table = DB_DEFAULT_DBTABLE;
- db_pwd = DB_DEFAULT_DBPWD;
+ db_host = DB_PG_DEFAULT_DBHOST;
+ db_name = DB_PG_DEFAULT_DBNAME;
+ db_port = DB_PG_DEFAULT_DBPORT;
+ db_user = DB_PG_DEFAULT_DBUSER;
+ db_table = DB_PG_DEFAULT_DBTABLE;
+ db_pwd = DB_PG_DEFAULT_DBPWD;
/* parse URI string */
@@ -891,7 +893,7 @@
db_name = strsep(&tmp, ":");
db_table = strsep(&tmp, ":");
- if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table) {
+ if (!db_user || !db_pwd || !db_host || !db_port || !db_name || !db_table) {
fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
return -1;
}
@@ -917,7 +919,7 @@
/* foreach flow */
while ((rec = ftio_read(ftio))) {
- len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1);
+ len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1, ',');
/* form SQL query and execute it */
if (len) {
@@ -958,9 +960,134 @@
} /* format5 */
+/*
+ * function: format6
+ *
+ * export flows (copy) into PostgreSQL Database
+ */
+int format6(struct ftio *ftio, struct options *opt)
+{
+#ifdef HAVE_PGSQL
+ struct fts3rec_offsets fo;
+ struct ftver ftv;
+ char fields[1024], values[1024], query[3*1024];
+ char *rec;
+ char *db_host, *db_name, *db_table, *db_user, *db_pwd, *tmp;
+ char *db_port;
+ int len;
+
+ PGconn *conn;
+ PGresult *res;
+
+ db_host = DB_PG_DEFAULT_DBHOST;
+ db_name = DB_PG_DEFAULT_DBNAME;
+ db_port = DB_PG_DEFAULT_DBPORT;
+ db_user = DB_PG_DEFAULT_DBUSER;
+ db_table = DB_PG_DEFAULT_DBTABLE;
+ db_pwd = DB_PG_DEFAULT_DBPWD;
+
+ /* parse URI string */
+
+ if (strlen(opt->dbaseURI)) {
+
+ tmp = opt->dbaseURI;
+
+ db_user = strsep(&tmp, ":");
+ db_pwd = strsep(&tmp, ":");
+ db_host = strsep(&tmp, ":");
+ db_port = strsep(&tmp, ":");
+ db_name = strsep(&tmp, ":");
+ db_table = strsep(&tmp, ":");
+
+ if (!db_user || !db_pwd || !db_host || !db_port || !db_name || !db_table) {
+ fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
+ return -1;
+ }
+
+ } /* dbaseURI */
+
+ ftio_get_ver(ftio, &ftv);
+
+ fts3rec_compute_offsets(&fo, &ftv);
+
+ /* remove invalid fields */
+ opt->ft_mask &= ftrec_xfield(&ftv);
+
+ /* generate the field names once */
+ len = fmt_xfields_type(fields, opt->ft_mask);
+
+ /* open PostgreSQL database */
+ conn = PQsetdbLogin(db_host, db_port, (char *) NULL, (char *) NULL, db_name, db_user, db_pwd);
+
+ if (PQstatus(conn) == CONNECTION_BAD)
+ fterr_errx(1,"PQsetdbLogin(): %s\n", PQerrorMessage(conn));
+
+ if (len) {
+ strcpy (query, "COPY ");
+ strcat (query, db_table);
+ strcat (query, "(");
+ strcat (query, fields);
+ strcat (query, ")");
+ strcat (query, " FROM STDIN");
+ }
+
+ if (debug)
+ fprintf(stderr, "field=%s\n query=%s\n", fields, query);
+
+ res = PQexec(conn, query);
+// if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
+// PQclear(res);
+// fterr_errx(1,"PQexec(): %s\n", PQerrorMessage(conn));
+// }
+
+ /* foreach flow */
+ while ((rec = ftio_read(ftio))) {
+
+ len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 0, '\t');
+
+ /* form SQL query and execute it */
+ if (len) {
+ if (debug)
+ fprintf(stderr, "%s\n", values);
+
+ strcpy (query, values);
+ strcat (query, "\n");
+
+ if (PQputline(conn, query)) {
+ fterr_errx(1,"PQputline(): %s\n", PQerrorMessage(conn));
+ }
+
+ }
+
+ ++opt->records;
+
+ } /* while */
+
+ if (debug)
+ fprintf(stderr, "\\.\n");
+ if (PQputline(conn,"\\.\n")) {
+ fterr_errx(1,"PQputline(): %s\n", PQerrorMessage(conn));
+ }
+
+ if (PQendcopy(conn)) {
+ fterr_errx(1,"PQendcopy(): %s\n", PQerrorMessage(conn));
+ }
+
+ /* close database */
+ PQfinish(conn);
+
+#else /* PGSQL */
+
+ fterr_warnx("Format not supported");
+
+#endif /* PGSQL */
+
+ return 0;
+
+} /* format6 */
+
int fmt_xfields_type(char *buf, u_int64 xfield)
{
- int comma;
+ int comma=0;
buf[0] = 0;
@@ -1168,7 +1295,7 @@
int fmt_xfields_val(char *fmt_buf, char *rec, struct fts3rec_offsets *fo,
- u_int64 xfields, int quote)
+ u_int64 xfields, int quote, char comma_char )
{
int comma, len;
@@ -1177,243 +1304,243 @@
len = comma = 0;
if (xfields & FT_XFIELD_UNIX_SECS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->unix_secs)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_UNIX_NSECS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->unix_nsecs)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SYSUPTIME) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->sysUpTime)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_EXADDR) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->exaddr)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_DFLOWS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dFlows)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DPKTS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dPkts)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DOCTETS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dOctets)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_FIRST) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->First)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_LAST) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->Last)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_ENGINE_TYPE) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->engine_type)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_ENGINE_ID) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->engine_id)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SRCADDR) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->srcaddr)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_DSTADDR) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->dstaddr)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_NEXTHOP) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->nexthop)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_INPUT) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->input)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_OUTPUT) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->output)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SRCPORT) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->srcport)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DSTPORT) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->dstport)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_PROT) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->prot)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_TOS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->tos)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_TCP_FLAGS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->tcp_flags)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SRC_MASK) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->src_mask)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DST_MASK) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->dst_mask)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SRC_AS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->src_as)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DST_AS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->dst_as)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_IN_ENCAPS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->in_encaps)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_OUT_ENCAPS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->out_encaps)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_PEER_NEXTHOP) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->peer_nexthop)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_ROUTER_SC) {
- if (comma) fmt_buf[len++] = ',';
- if (quote) fmt_buf[len++] = '"';
+ if (comma) fmt_buf[len++] = comma_char;
+ if (quote) fmt_buf[len++] = '\'';
len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->router_sc)),
FMT_JUST_LEFT);
- if (quote) fmt_buf[len++] = '"';
+ if (quote) fmt_buf[len++] = '\'';
comma = 1;
}
if (xfields & FT_XFIELD_MARKED_TOS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->marked_tos)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_EXTRA_PKTS) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->extra_pkts)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_SRC_TAG) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->src_tag)),
FMT_JUST_LEFT);
comma = 1;
}
if (xfields & FT_XFIELD_DST_TAG) {
- if (comma) fmt_buf[len++] = ',';
+ if (comma) fmt_buf[len++] = comma_char;
len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dst_tag)),
FMT_JUST_LEFT);
comma = 1;
_______________________________________________ Flow-tools mailing list [EMAIL PROTECTED] http://mailman.splintered.net/mailman/listinfo/flow-tools
