Hi:

Another one for sqlbox. This is based on
http://www.magicom-bcn.net/kannel/sqlbox-standalone-multi-20080227.patch by
Alejandro Guerrieri, but avoiding the situation where if we die in the middle
of a batch we lose messages. In this implementation worst case scenario we
repeat a message.

I've only implemented the batch processing for msyql, since that's what I use,
but everything is in place to implement the same for the rest of the db 
backends.

Also I think it would be ok to cleanup the single-message processing from
gw/sqlbox.c once every backend implement the batch processing (which is a lot
more performant and tunable, I could come up with some numbers if needed).

There is a small comment to make about the List *msgids parameter, please note
that this parameter is completely backend controlled to allow backends using
multiple keys or things even weirder, gw/sqlbox.c only cares about calling
gw_sql_batch_msg_done with the corresponding component from the list (which is
assumed is in the exact same order of the List *msgs parameter).

It's no biggie (note that around 50% of mysql_fetch_batch() in sqlbox_mysql.c
is the same msg processing of mysql_fetch_msg() [1]) but helps performance a 
great
deal.

diffstat sqlbox-0.7.2-batch_processing.patch
 sqlbox-cfg.def   |    1 
 sqlbox.c         |   30 ++++++++++++++++++++-
 sqlbox_mssql.c   |    2 +
 sqlbox_mysql.c   |   76 
+++++++++++++++++++++++++++++++++++++++++++++++++++++++  
 sqlbox_mysql.h   |    5 +++
 sqlbox_oracle.c  |    2 +
 sqlbox_pgsql.c   |    2 +
 sqlbox_sdb.c     |    2 +
 sqlbox_sql.h     |    4 ++
 sqlbox_sqlite.c  |    2 +
 sqlbox_sqlite3.c |    2 +
 11 files changed, 127 insertions(+), 1 deletion(-)


Comments, corrections, commits ;) welcome

Again, this diff is against current stable, let me know if it needs updating.

  Damián Viano(Des).

[1] Yeah, I know that phrase calls for a function but that wouldn't make sense
if we drop the single message processing code
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox.c sqlbox-0.7.2_des/gw/sqlbox.c
--- sqlbox-0.7.2/gw/sqlbox.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox.c	2009-06-08 17:47:49.000000000 -0300
@@ -86,6 +86,7 @@
 static Octstr *box_allow_ip;
 static Octstr *box_deny_ip;
 static Octstr *global_sender;
+static long limit_per_cycle;
 
 #ifndef HAVE_MSSQL
 #ifndef HAVE_MYSQL
@@ -105,6 +106,7 @@
 Octstr *sqlbox_id;
 
 #define SLEEP_BETWEEN_SELECTS 1.0
+#define DEFAULT_LIMIT_PER_CYCLE 10
 
 typedef struct _boxc {
     Connection    *smsbox_connection;
@@ -528,6 +530,7 @@
     Boxc *boxc;
     int fd;
     Msg *msg;
+    List *msgs, *msgids;
 
     boxc = gw_malloc(sizeof(Boxc));
     boxc->bearerbox_connection = connect_to_bearerbox_real(bearerbox_host, bearerbox_port, bearerbox_port_ssl, NULL /* bb_our_host */);
@@ -545,8 +548,28 @@
 
     identify_to_bearerbox(boxc);
 
+    if (gw_sql_fetch_batch != NULL) {
+       msgs = gwlist_create();
+       msgids = gwlist_create();
+    }
+
     while (sqlbox_status == SQL_RUNNING) {
-        if ((msg = gw_sql_fetch_msg()) != NULL) {
+        if (gw_sql_fetch_batch != NULL) {
+            debug("sqlbox", 0, "sql_to_bearerbox: using batch processing, limit: %ld", limit_per_cycle);
+            if (gw_sql_fetch_batch(msgs, msgids, limit_per_cycle) == 0)
+                gwthread_sleep(SLEEP_BETWEEN_SELECTS);
+            while ((msg = gwlist_consume(msgs)) != NULL ) {
+                if (global_sender != NULL && (msg->sms.sender == NULL || octstr_len(msg->sms.sender) == 0)) {
+                    msg->sms.sender = octstr_duplicate(global_sender);
+                }
+                send_msg(boxc->bearerbox_connection, boxc, msg);
+                gw_sql_save_msg(msg, octstr_imm("MT"));
+                gw_sql_batch_msg_done(gwlist_consume(msgids));
+            }
+            gw_assert(gwlist_len(msgids) == 0);
+        }
+        else if (gw_sql_fetch_batch == NULL && (msg = gw_sql_fetch_msg()) != NULL) {
+            debug("sqlbox", 0, "sql_to_bearerbox: using single message processing, limit: %ld", limit_per_cycle);
             if (global_sender != NULL && (msg->sms.sender == NULL || octstr_len(msg->sms.sender) == 0)) {
                 msg->sms.sender = octstr_duplicate(global_sender);
             }
@@ -685,6 +708,11 @@
 
     if (cfg_get_integer(&sqlbox_port, grp, octstr_imm("smsbox-port")) == -1)
         sqlbox_port = 13005;
+
+    /* setup limit per cycle */
+    if (cfg_get_integer(&limit_per_cycle, grp, octstr_imm("limit-per-cycle")) == -1)
+        limit_per_cycle = DEFAULT_LIMIT_PER_CYCLE;
+
     /* setup logfile stuff */
     logfile = cfg_get(grp, octstr_imm("log-file"));
 
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox-cfg.def sqlbox-0.7.2_des/gw/sqlbox-cfg.def
--- sqlbox-0.7.2/gw/sqlbox-cfg.def	2008-11-03 17:33:15.000000000 -0200
+++ sqlbox-0.7.2_des/gw/sqlbox-cfg.def	2009-06-08 15:09:07.000000000 -0300
@@ -21,4 +21,5 @@
     OCTSTR(ssl-server-cert-file)
     OCTSTR(ssl-server-key-file)
     OCTSTR(ssl-trusted-ca-file)
+    OCTSTR(limit-per-cycle)
 )
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mssql.c sqlbox-0.7.2_des/gw/sqlbox_mssql.c
--- sqlbox-0.7.2/gw/sqlbox_mssql.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_mssql.c	2009-06-08 15:05:15.000000000 -0300
@@ -293,6 +293,8 @@
     res->sql_leave = mssql_leave;
     res->sql_fetch_msg = mssql_fetch_msg;
     res->sql_save_msg = mssql_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 #endif
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mysql.c sqlbox-0.7.2_des/gw/sqlbox_mysql.c
--- sqlbox-0.7.2/gw/sqlbox_mysql.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_mysql.c	2009-06-08 17:37:39.000000000 -0300
@@ -161,6 +161,80 @@
     return msg;
 }
 
+long mysql_fetch_batch(List *msgs, List *msgids, long limit)
+{
+    Msg *msg = NULL;
+    Octstr *sql, *id;
+    MYSQL_RES *res;
+    MYSQL_ROW row;
+    int amount;
+
+    sql = octstr_format(SQLBOX_MYSQL_SELECT_BATCH_QUERY, sqlbox_insert_table, limit);
+#if defined(SQLBOX_TRACE)
+     debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(sql));
+#endif
+    res = mysql_select(sql);
+    if (res == NULL) {
+        debug("sqlbox", 0, "SQL statement failed: %s", octstr_get_cstr(sql));
+    }
+    else {
+        if (amount = mysql_num_rows(res) >= 1) {
+            while (row = mysql_fetch_row(res)) {
+                id = octstr_null_create(row[0]);
+                /* save fields in this row as msg struct */
+                msg = msg_create(sms);
+                msg->sms.sender     = octstr_null_create(row[2]);
+                msg->sms.receiver   = octstr_null_create(row[3]);
+                msg->sms.udhdata    = octstr_null_create(row[4]);
+                msg->sms.msgdata    = octstr_null_create(row[5]);
+                msg->sms.time       = atol_null(row[6]);
+                msg->sms.smsc_id    = octstr_null_create(row[7]);
+                msg->sms.service    = octstr_null_create(row[8]);
+                msg->sms.account    = octstr_null_create(row[9]);
+                /* msg->sms.id      = atol_null(row[10]); */
+                msg->sms.sms_type   = atol_null(row[11]);
+                msg->sms.mclass     = atol_null(row[12]);
+                msg->sms.mwi        = atol_null(row[13]);
+                msg->sms.coding     = atol_null(row[14]);
+                msg->sms.compress   = atol_null(row[15]);
+                msg->sms.validity   = atol_null(row[16]);
+                msg->sms.deferred   = atol_null(row[17]);
+                msg->sms.dlr_mask   = atol_null(row[18]);
+                msg->sms.dlr_url    = octstr_null_create(row[19]);
+                msg->sms.pid        = atol_null(row[20]);
+                msg->sms.alt_dcs    = atol_null(row[21]);
+                msg->sms.rpi        = atol_null(row[22]);
+                msg->sms.charset    = octstr_null_create(row[23]);
+                msg->sms.binfo      = octstr_null_create(row[25]);
+                if (row[24] == NULL) {
+                    msg->sms.boxc_id= octstr_duplicate(sqlbox_id);
+                }
+                else {
+                    msg->sms.boxc_id= octstr_null_create(row[24]);
+                }
+                gwlist_produce(msgs, msg);
+                gwlist_produce(msgids, id);
+            }
+        }
+        mysql_free_result(res);
+    }
+    octstr_destroy(sql);
+    return amount;
+}
+
+void mysql_batch_msg_done(void *id)
+{
+    Octstr *delet; 
+    /* delete row by row */
+    delet = octstr_format(SQLBOX_MYSQL_DELETE_QUERY, sqlbox_insert_table, (Octstr *)id);
+#if defined(SQLBOX_TRACE)
+    debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(delet));
+#endif
+    mysql_update(delet);
+    octstr_destroy((Octstr *)id);
+    octstr_destroy(delet);
+}
+
 static Octstr *get_numeric_value_or_return_null(long int num)
 {
     if (num == -1) {
@@ -309,6 +383,8 @@
     res->sql_leave = mysql_leave;
     res->sql_fetch_msg = mysql_fetch_msg;
     res->sql_save_msg = mysql_save_msg;
+    res->sql_fetch_batch = mysql_fetch_batch;
+    res->sql_batch_msg_done = mysql_batch_msg_done;
     return res;
 }
 #endif
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mysql.h sqlbox-0.7.2_des/gw/sqlbox_mysql.h
--- sqlbox-0.7.2/gw/sqlbox_mysql.h	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_mysql.h	2009-06-08 17:36:38.000000000 -0300
@@ -31,6 +31,11 @@
 compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \
 charset, boxc_id, binfo FROM %S LIMIT 0,1"
 
+#define SQLBOX_MYSQL_SELECT_BATCH_QUERY "SELECT sql_id, momt, sender, receiver, udhdata, \
+msgdata, time, smsc_id, service, account, id, sms_type, mclass, mwi, coding, \
+compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \
+charset, boxc_id, binfo FROM %S LIMIT 0,%ld"
+
 #define SQLBOX_MYSQL_INSERT_QUERY "INSERT INTO %S (sql_id, momt, sender, \
 receiver, udhdata, msgdata, time, smsc_id, service, account, sms_type, \
 mclass, mwi, coding, compress, validity, deferred, dlr_mask, dlr_url, \
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_oracle.c sqlbox-0.7.2_des/gw/sqlbox_oracle.c
--- sqlbox-0.7.2/gw/sqlbox_oracle.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_oracle.c	2009-06-08 15:05:32.000000000 -0300
@@ -314,6 +314,8 @@
     res->sql_leave = oracle_leave;
     res->sql_fetch_msg = oracle_fetch_msg;
     res->sql_save_msg = oracle_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 #endif
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_pgsql.c sqlbox-0.7.2_des/gw/sqlbox_pgsql.c
--- sqlbox-0.7.2/gw/sqlbox_pgsql.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_pgsql.c	2009-06-08 15:05:41.000000000 -0300
@@ -376,6 +376,8 @@
     res->sql_leave = pgsql_leave;
     res->sql_fetch_msg = pgsql_fetch_msg;
     res->sql_save_msg = pgsql_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sdb.c sqlbox-0.7.2_des/gw/sqlbox_sdb.c
--- sqlbox-0.7.2/gw/sqlbox_sdb.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_sdb.c	2009-06-08 15:05:49.000000000 -0300
@@ -412,6 +412,8 @@
     res->sql_leave = sdb_leave;
     res->sql_fetch_msg = sdb_fetch_msg;
     res->sql_save_msg = sdb_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 #endif
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sql.h sqlbox-0.7.2_des/gw/sqlbox_sql.h
--- sqlbox-0.7.2/gw/sqlbox_sql.h	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_sql.h	2009-06-08 14:26:34.000000000 -0300
@@ -16,6 +16,8 @@
     void (*sql_leave) ();
     Msg *(*sql_fetch_msg) ();
     void (*sql_save_msg) (Msg *, Octstr *);
+    long (*sql_fetch_batch) (List *msgs, List *msgids, long limit);
+    void (*sql_batch_msg_done) (void *id);
 };
 
 struct sqlbox_db_queries {
@@ -41,6 +43,8 @@
 #define gw_sql_save_msg sql_type->sql_save_msg
 #define gw_sql_enter sql_type->sql_enter
 #define gw_sql_leave sql_type->sql_leave
+#define gw_sql_fetch_batch sql_type->sql_fetch_batch
+#define gw_sql_batch_msg_done sql_type->sql_batch_msg_done
 
 /* Macro to run the queries to create tables */
 #define sqlbox_run_query(query, table) \
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sqlite3.c sqlbox-0.7.2_des/gw/sqlbox_sqlite3.c
--- sqlbox-0.7.2/gw/sqlbox_sqlite3.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_sqlite3.c	2009-06-08 15:05:58.000000000 -0300
@@ -314,6 +314,8 @@
     res->sql_leave = sqlite3_leave;
     res->sql_fetch_msg = sqlite3_fetch_msg;
     res->sql_save_msg = sqlite3_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 #endif
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sqlite.c sqlbox-0.7.2_des/gw/sqlbox_sqlite.c
--- sqlbox-0.7.2/gw/sqlbox_sqlite.c	2009-05-19 12:08:35.000000000 -0300
+++ sqlbox-0.7.2_des/gw/sqlbox_sqlite.c	2009-06-08 15:06:05.000000000 -0300
@@ -322,6 +322,8 @@
     res->sql_leave = sqlite_leave;
     res->sql_fetch_msg = sqlite_fetch_msg;
     res->sql_save_msg = sqlite_save_msg;
+    res->sql_fetch_batch = NULL;
+    res->sql_batch_msg_done = NULL;
     return res;
 }
 #endif

Reply via email to