format6 uses copy from stdin instead of inserts (format5)
more than 100 times faster, as for me :)
+ ' quotation instead of "
+ error with db_port in format 5
patch is for version 0.67 but works with 0.68 too

Alexey Bestchekov
--- flow-tools-0.67_orig/src/flow-export.c	2005-11-24 01:51:39.000000000 +0300
+++ flow-tools-0.67/src/flow-export.c	2005-11-24 01:47:57.000000000 +0300
@@ -64,12 +64,12 @@
 
 #include <libpq-fe.h>
 
-#define DB_DEFAULT_DBHOST "localhost"
-#define DB_DEFAULT_DBNAME "netflow"
-#define DB_DEFAULT_DBPORT "5432"
-#define DB_DEFAULT_DBTABLE "raw"
-#define DB_DEFAULT_DBUSER "netflow"
-#define DB_DEFAULT_DBPWD "netflow"
+#define DB_PG_DEFAULT_DBHOST "localhost"
+#define DB_PG_DEFAULT_DBNAME "netflow"
+#define DB_PG_DEFAULT_DBPORT "5432"
+#define DB_PG_DEFAULT_DBTABLE "raw"
+#define DB_PG_DEFAULT_DBUSER "netflow"
+#define DB_PG_DEFAULT_DBPWD "netflow"
 
 #endif /* PGSQL*/
 
@@ -103,18 +104,19 @@
 int format3(struct ftio *ftio, struct options *opt);
 int format4(struct ftio *ftio, struct options *opt);
 int format5(struct ftio *ftio, struct options *opt);
+int format6(struct ftio *ftio, struct options *opt);
 
 int ftxfield_tocflow(u_int64 xfields, u_int32 *cfmask);
 
 int fmt_xfields_val(char *fmt_buf, char *rec, struct fts3rec_offsets *fo,
-  u_int64 xfields, int quote);
+  u_int64 xfields, int quote, char comma_char );
 int fmt_xfields_type(char *buf, u_int64 xfield);
 
 void usage(void);
 
-#define NFORMATS 6 /* nformats - 1 */
+#define NFORMATS 7 /* nformats - 1 */
 struct jump format[] = {{format0}, {format1}, {format2}, {format3},
-                        {format4}, {format5}};
+                        {format4}, {format5}, {format6}};
 
 int main(int argc, char **argv)
 {
@@ -663,7 +665,7 @@
 
   while ((rec = ftio_read(ftio))) {
 
-    len = fmt_xfields_val(fmt_buf, rec, &fo, opt->ft_mask, 0);
+    len = fmt_xfields_val(fmt_buf, rec, &fo, opt->ft_mask, 0, ',');
 
     if (len)
       printf("%s\n", fmt_buf);
@@ -746,7 +748,7 @@
   /* foreach flow */
   while ((rec = ftio_read(ftio))) {
 
-    len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1);
+    len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1, ',');
 
     /* form SQL query and execute it */
     if (len) {
@@ -864,19 +866,19 @@
   struct ftver ftv;
   char fields[1024], values[1024], query[3*1024];
   char *rec;
-  char *db_host, *db_name, *db_table, *db_user, *db_pwd, *db_tmp, *tmp;
+  char *db_host, *db_name, *db_table, *db_user, *db_pwd, *tmp;
   char *db_port;
   int len;
 
   PGconn     *conn;
   PGresult   *res;
 
-  db_host = DB_DEFAULT_DBHOST;
-  db_name = DB_DEFAULT_DBNAME;
-  db_port = DB_DEFAULT_DBPORT;
-  db_user = DB_DEFAULT_DBUSER;
-  db_table = DB_DEFAULT_DBTABLE;
-  db_pwd = DB_DEFAULT_DBPWD;
+  db_host = DB_PG_DEFAULT_DBHOST;
+  db_name = DB_PG_DEFAULT_DBNAME;
+  db_port = DB_PG_DEFAULT_DBPORT;
+  db_user = DB_PG_DEFAULT_DBUSER;
+  db_table = DB_PG_DEFAULT_DBTABLE;
+  db_pwd = DB_PG_DEFAULT_DBPWD;
 
   /* parse URI string */
 
@@ -891,7 +893,7 @@
     db_name = strsep(&tmp, ":");
     db_table = strsep(&tmp, ":");
 
-    if (!db_user || !db_pwd || !db_host || !db_tmp || !db_name || !db_table) {
+    if (!db_user || !db_pwd || !db_host || !db_port || !db_name || !db_table) {
       fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
       return -1;
     }
@@ -917,7 +919,7 @@
   /* foreach flow */
   while ((rec = ftio_read(ftio))) {
 
-    len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1);
+    len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 1, ',');
 
     /* form SQL query and execute it */
     if (len) {
@@ -958,9 +960,134 @@
  
 } /* format5 */ 
 
+/*
+ * function: format6
+ *
+ * export flows (copy) into PostgreSQL Database
+ */
+int format6(struct ftio *ftio, struct options *opt)
+{
+#ifdef HAVE_PGSQL
+  struct fts3rec_offsets fo;
+  struct ftver ftv;
+  char fields[1024], values[1024], query[3*1024];
+  char *rec;
+  char *db_host, *db_name, *db_table, *db_user, *db_pwd, *tmp;
+  char *db_port;
+  int len;
+
+  PGconn     *conn;
+  PGresult   *res;
+
+  db_host = DB_PG_DEFAULT_DBHOST;
+  db_name = DB_PG_DEFAULT_DBNAME;
+  db_port = DB_PG_DEFAULT_DBPORT;
+  db_user = DB_PG_DEFAULT_DBUSER;
+  db_table = DB_PG_DEFAULT_DBTABLE;
+  db_pwd = DB_PG_DEFAULT_DBPWD;
+
+  /* parse URI string */
+
+  if (strlen(opt->dbaseURI)) {
+
+    tmp = opt->dbaseURI;
+
+    db_user = strsep(&tmp, ":");
+    db_pwd = strsep(&tmp, ":");
+    db_host = strsep(&tmp, ":");
+    db_port = strsep(&tmp, ":");
+    db_name = strsep(&tmp, ":");
+    db_table = strsep(&tmp, ":");
+
+    if (!db_user || !db_pwd || !db_host || !db_port || !db_name || !db_table) {
+      fterr_warnx("Missing field in dbaseURI, expecting user:pwd:host:port:name:table.");
+      return -1;
+    }
+
+  } /* dbaseURI */
+
+  ftio_get_ver(ftio, &ftv);
+
+  fts3rec_compute_offsets(&fo, &ftv);
+
+  /* remove invalid fields */
+  opt->ft_mask &= ftrec_xfield(&ftv);
+
+  /* generate the field names once */
+  len = fmt_xfields_type(fields, opt->ft_mask);
+
+  /* open PostgreSQL database */
+  conn = PQsetdbLogin(db_host, db_port, (char *) NULL, (char *) NULL, db_name, db_user, db_pwd);
+
+  if (PQstatus(conn) == CONNECTION_BAD) 
+    fterr_errx(1,"PQsetdbLogin(): %s\n", PQerrorMessage(conn));
+
+  if (len) {
+      strcpy (query, "COPY ");
+      strcat (query, db_table);
+      strcat (query, "(");
+      strcat (query, fields);
+      strcat (query, ")");
+      strcat (query, " FROM STDIN");
+  }
+
+  if (debug)
+      fprintf(stderr, "field=%s\n query=%s\n", fields, query);
+
+  res = PQexec(conn, query);
+//  if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) {
+//    PQclear(res);
+//    fterr_errx(1,"PQexec(): %s\n", PQerrorMessage(conn));
+//  }
+  
+  /* foreach flow */
+  while ((rec = ftio_read(ftio))) {
+
+    len = fmt_xfields_val(values, rec, &fo, opt->ft_mask, 0, '\t');
+
+    /* form SQL query and execute it */
+    if (len) {
+      if (debug)
+        fprintf(stderr, "%s\n", values);
+      
+      strcpy (query, values);
+      strcat (query, "\n");
+
+      if (PQputline(conn, query)) {
+        fterr_errx(1,"PQputline(): %s\n", PQerrorMessage(conn));
+      }
+
+    }
+
+    ++opt->records;
+
+  } /* while */
+
+  if (debug)
+    fprintf(stderr, "\\.\n");
+  if (PQputline(conn,"\\.\n")) {
+    fterr_errx(1,"PQputline(): %s\n", PQerrorMessage(conn));
+  }
+  
+  if (PQendcopy(conn)) {
+    fterr_errx(1,"PQendcopy(): %s\n", PQerrorMessage(conn));
+  }
+  
+  /* close database */
+  PQfinish(conn);
+
+#else /* PGSQL */
+
+  fterr_warnx("Format not supported");
+
+#endif /* PGSQL */
+
+  return 0;
+ 
+} /* format6 */ 
+
 int fmt_xfields_type(char *buf, u_int64 xfield)
 {
-  int comma;
+  int comma=0;
 
   buf[0] = 0;
 
@@ -1168,7 +1295,7 @@
 
 
 int fmt_xfields_val(char *fmt_buf, char *rec, struct fts3rec_offsets *fo,
-  u_int64 xfields, int quote)
+  u_int64 xfields, int quote, char comma_char )
 {
   int comma, len;
 
@@ -1177,243 +1304,243 @@
   len = comma = 0;
 
   if (xfields & FT_XFIELD_UNIX_SECS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->unix_secs)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_UNIX_NSECS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->unix_nsecs)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SYSUPTIME) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->sysUpTime)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_EXADDR) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->exaddr)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DFLOWS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dFlows)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DPKTS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dPkts)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DOCTETS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dOctets)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_FIRST) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->First)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_LAST) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->Last)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_ENGINE_TYPE) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->engine_type)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_ENGINE_ID) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->engine_id)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SRCADDR) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->srcaddr)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DSTADDR) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->dstaddr)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_NEXTHOP) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->nexthop)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_INPUT) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->input)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_OUTPUT) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->output)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SRCPORT) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->srcport)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DSTPORT) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->dstport)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_PROT) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->prot)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_TOS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->tos)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_TCP_FLAGS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->tcp_flags)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SRC_MASK) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->src_mask)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DST_MASK) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->dst_mask)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SRC_AS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->src_as)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DST_AS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint16(fmt_buf+len, *((u_int16*)(rec+fo->dst_as)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_IN_ENCAPS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->in_encaps)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_OUT_ENCAPS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->out_encaps)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_PEER_NEXTHOP) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->peer_nexthop)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_ROUTER_SC) {
-    if (comma) fmt_buf[len++] = ',';
-    if (quote) fmt_buf[len++] = '"';
+    if (comma) fmt_buf[len++] = comma_char;
+    if (quote) fmt_buf[len++] = '\'';
     len += fmt_ipv4(fmt_buf+len, *((u_int32*)(rec+fo->router_sc)),
       FMT_JUST_LEFT);
-    if (quote) fmt_buf[len++] = '"';
+    if (quote) fmt_buf[len++] = '\'';
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_MARKED_TOS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint8(fmt_buf+len, *((u_int8*)(rec+fo->marked_tos)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_EXTRA_PKTS) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->extra_pkts)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_SRC_TAG) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->src_tag)),
       FMT_JUST_LEFT);
     comma = 1;
   }
 
   if (xfields & FT_XFIELD_DST_TAG) {
-    if (comma) fmt_buf[len++] = ',';
+    if (comma) fmt_buf[len++] = comma_char;
     len += fmt_uint32(fmt_buf+len, *((u_int32*)(rec+fo->dst_tag)),
       FMT_JUST_LEFT);
     comma = 1;
_______________________________________________
Flow-tools mailing list
[EMAIL PROTECTED]
http://mailman.splintered.net/mailman/listinfo/flow-tools

Reply via email to