Hi: Another one for sqlbox. This is based on http://www.magicom-bcn.net/kannel/sqlbox-standalone-multi-20080227.patch by Alejandro Guerrieri, but avoiding the situation where if we die in the middle of a batch we lose messages. In this implementation worst case scenario we repeat a message.
I've only implemented the batch processing for msyql, since that's what I use, but everything is in place to implement the same for the rest of the db backends. Also I think it would be ok to cleanup the single-message processing from gw/sqlbox.c once every backend implement the batch processing (which is a lot more performant and tunable, I could come up with some numbers if needed). There is a small comment to make about the List *msgids parameter, please note that this parameter is completely backend controlled to allow backends using multiple keys or things even weirder, gw/sqlbox.c only cares about calling gw_sql_batch_msg_done with the corresponding component from the list (which is assumed is in the exact same order of the List *msgs parameter). It's no biggie (note that around 50% of mysql_fetch_batch() in sqlbox_mysql.c is the same msg processing of mysql_fetch_msg() [1]) but helps performance a great deal. diffstat sqlbox-0.7.2-batch_processing.patch sqlbox-cfg.def | 1 sqlbox.c | 30 ++++++++++++++++++++- sqlbox_mssql.c | 2 + sqlbox_mysql.c | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ sqlbox_mysql.h | 5 +++ sqlbox_oracle.c | 2 + sqlbox_pgsql.c | 2 + sqlbox_sdb.c | 2 + sqlbox_sql.h | 4 ++ sqlbox_sqlite.c | 2 + sqlbox_sqlite3.c | 2 + 11 files changed, 127 insertions(+), 1 deletion(-) Comments, corrections, commits ;) welcome Again, this diff is against current stable, let me know if it needs updating. Damián Viano(Des). [1] Yeah, I know that phrase calls for a function but that wouldn't make sense if we drop the single message processing code
diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox.c sqlbox-0.7.2_des/gw/sqlbox.c --- sqlbox-0.7.2/gw/sqlbox.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox.c 2009-06-08 17:47:49.000000000 -0300 @@ -86,6 +86,7 @@ static Octstr *box_allow_ip; static Octstr *box_deny_ip; static Octstr *global_sender; +static long limit_per_cycle; #ifndef HAVE_MSSQL #ifndef HAVE_MYSQL @@ -105,6 +106,7 @@ Octstr *sqlbox_id; #define SLEEP_BETWEEN_SELECTS 1.0 +#define DEFAULT_LIMIT_PER_CYCLE 10 typedef struct _boxc { Connection *smsbox_connection; @@ -528,6 +530,7 @@ Boxc *boxc; int fd; Msg *msg; + List *msgs, *msgids; boxc = gw_malloc(sizeof(Boxc)); boxc->bearerbox_connection = connect_to_bearerbox_real(bearerbox_host, bearerbox_port, bearerbox_port_ssl, NULL /* bb_our_host */); @@ -545,8 +548,28 @@ identify_to_bearerbox(boxc); + if (gw_sql_fetch_batch != NULL) { + msgs = gwlist_create(); + msgids = gwlist_create(); + } + while (sqlbox_status == SQL_RUNNING) { - if ((msg = gw_sql_fetch_msg()) != NULL) { + if (gw_sql_fetch_batch != NULL) { + debug("sqlbox", 0, "sql_to_bearerbox: using batch processing, limit: %ld", limit_per_cycle); + if (gw_sql_fetch_batch(msgs, msgids, limit_per_cycle) == 0) + gwthread_sleep(SLEEP_BETWEEN_SELECTS); + while ((msg = gwlist_consume(msgs)) != NULL ) { + if (global_sender != NULL && (msg->sms.sender == NULL || octstr_len(msg->sms.sender) == 0)) { + msg->sms.sender = octstr_duplicate(global_sender); + } + send_msg(boxc->bearerbox_connection, boxc, msg); + gw_sql_save_msg(msg, octstr_imm("MT")); + gw_sql_batch_msg_done(gwlist_consume(msgids)); + } + gw_assert(gwlist_len(msgids) == 0); + } + else if (gw_sql_fetch_batch == NULL && (msg = gw_sql_fetch_msg()) != NULL) { + debug("sqlbox", 0, "sql_to_bearerbox: using single message processing, limit: %ld", limit_per_cycle); if (global_sender != NULL && (msg->sms.sender == NULL || octstr_len(msg->sms.sender) == 0)) { msg->sms.sender = octstr_duplicate(global_sender); } @@ -685,6 +708,11 @@ if (cfg_get_integer(&sqlbox_port, grp, octstr_imm("smsbox-port")) == -1) sqlbox_port = 13005; + + /* setup limit per cycle */ + if (cfg_get_integer(&limit_per_cycle, grp, octstr_imm("limit-per-cycle")) == -1) + limit_per_cycle = DEFAULT_LIMIT_PER_CYCLE; + /* setup logfile stuff */ logfile = cfg_get(grp, octstr_imm("log-file")); diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox-cfg.def sqlbox-0.7.2_des/gw/sqlbox-cfg.def --- sqlbox-0.7.2/gw/sqlbox-cfg.def 2008-11-03 17:33:15.000000000 -0200 +++ sqlbox-0.7.2_des/gw/sqlbox-cfg.def 2009-06-08 15:09:07.000000000 -0300 @@ -21,4 +21,5 @@ OCTSTR(ssl-server-cert-file) OCTSTR(ssl-server-key-file) OCTSTR(ssl-trusted-ca-file) + OCTSTR(limit-per-cycle) ) diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mssql.c sqlbox-0.7.2_des/gw/sqlbox_mssql.c --- sqlbox-0.7.2/gw/sqlbox_mssql.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_mssql.c 2009-06-08 15:05:15.000000000 -0300 @@ -293,6 +293,8 @@ res->sql_leave = mssql_leave; res->sql_fetch_msg = mssql_fetch_msg; res->sql_save_msg = mssql_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } #endif diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mysql.c sqlbox-0.7.2_des/gw/sqlbox_mysql.c --- sqlbox-0.7.2/gw/sqlbox_mysql.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_mysql.c 2009-06-08 17:37:39.000000000 -0300 @@ -161,6 +161,80 @@ return msg; } +long mysql_fetch_batch(List *msgs, List *msgids, long limit) +{ + Msg *msg = NULL; + Octstr *sql, *id; + MYSQL_RES *res; + MYSQL_ROW row; + int amount; + + sql = octstr_format(SQLBOX_MYSQL_SELECT_BATCH_QUERY, sqlbox_insert_table, limit); +#if defined(SQLBOX_TRACE) + debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(sql)); +#endif + res = mysql_select(sql); + if (res == NULL) { + debug("sqlbox", 0, "SQL statement failed: %s", octstr_get_cstr(sql)); + } + else { + if (amount = mysql_num_rows(res) >= 1) { + while (row = mysql_fetch_row(res)) { + id = octstr_null_create(row[0]); + /* save fields in this row as msg struct */ + msg = msg_create(sms); + msg->sms.sender = octstr_null_create(row[2]); + msg->sms.receiver = octstr_null_create(row[3]); + msg->sms.udhdata = octstr_null_create(row[4]); + msg->sms.msgdata = octstr_null_create(row[5]); + msg->sms.time = atol_null(row[6]); + msg->sms.smsc_id = octstr_null_create(row[7]); + msg->sms.service = octstr_null_create(row[8]); + msg->sms.account = octstr_null_create(row[9]); + /* msg->sms.id = atol_null(row[10]); */ + msg->sms.sms_type = atol_null(row[11]); + msg->sms.mclass = atol_null(row[12]); + msg->sms.mwi = atol_null(row[13]); + msg->sms.coding = atol_null(row[14]); + msg->sms.compress = atol_null(row[15]); + msg->sms.validity = atol_null(row[16]); + msg->sms.deferred = atol_null(row[17]); + msg->sms.dlr_mask = atol_null(row[18]); + msg->sms.dlr_url = octstr_null_create(row[19]); + msg->sms.pid = atol_null(row[20]); + msg->sms.alt_dcs = atol_null(row[21]); + msg->sms.rpi = atol_null(row[22]); + msg->sms.charset = octstr_null_create(row[23]); + msg->sms.binfo = octstr_null_create(row[25]); + if (row[24] == NULL) { + msg->sms.boxc_id= octstr_duplicate(sqlbox_id); + } + else { + msg->sms.boxc_id= octstr_null_create(row[24]); + } + gwlist_produce(msgs, msg); + gwlist_produce(msgids, id); + } + } + mysql_free_result(res); + } + octstr_destroy(sql); + return amount; +} + +void mysql_batch_msg_done(void *id) +{ + Octstr *delet; + /* delete row by row */ + delet = octstr_format(SQLBOX_MYSQL_DELETE_QUERY, sqlbox_insert_table, (Octstr *)id); +#if defined(SQLBOX_TRACE) + debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(delet)); +#endif + mysql_update(delet); + octstr_destroy((Octstr *)id); + octstr_destroy(delet); +} + static Octstr *get_numeric_value_or_return_null(long int num) { if (num == -1) { @@ -309,6 +383,8 @@ res->sql_leave = mysql_leave; res->sql_fetch_msg = mysql_fetch_msg; res->sql_save_msg = mysql_save_msg; + res->sql_fetch_batch = mysql_fetch_batch; + res->sql_batch_msg_done = mysql_batch_msg_done; return res; } #endif diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_mysql.h sqlbox-0.7.2_des/gw/sqlbox_mysql.h --- sqlbox-0.7.2/gw/sqlbox_mysql.h 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_mysql.h 2009-06-08 17:36:38.000000000 -0300 @@ -31,6 +31,11 @@ compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \ charset, boxc_id, binfo FROM %S LIMIT 0,1" +#define SQLBOX_MYSQL_SELECT_BATCH_QUERY "SELECT sql_id, momt, sender, receiver, udhdata, \ +msgdata, time, smsc_id, service, account, id, sms_type, mclass, mwi, coding, \ +compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \ +charset, boxc_id, binfo FROM %S LIMIT 0,%ld" + #define SQLBOX_MYSQL_INSERT_QUERY "INSERT INTO %S (sql_id, momt, sender, \ receiver, udhdata, msgdata, time, smsc_id, service, account, sms_type, \ mclass, mwi, coding, compress, validity, deferred, dlr_mask, dlr_url, \ diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_oracle.c sqlbox-0.7.2_des/gw/sqlbox_oracle.c --- sqlbox-0.7.2/gw/sqlbox_oracle.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_oracle.c 2009-06-08 15:05:32.000000000 -0300 @@ -314,6 +314,8 @@ res->sql_leave = oracle_leave; res->sql_fetch_msg = oracle_fetch_msg; res->sql_save_msg = oracle_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } #endif diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_pgsql.c sqlbox-0.7.2_des/gw/sqlbox_pgsql.c --- sqlbox-0.7.2/gw/sqlbox_pgsql.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_pgsql.c 2009-06-08 15:05:41.000000000 -0300 @@ -376,6 +376,8 @@ res->sql_leave = pgsql_leave; res->sql_fetch_msg = pgsql_fetch_msg; res->sql_save_msg = pgsql_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sdb.c sqlbox-0.7.2_des/gw/sqlbox_sdb.c --- sqlbox-0.7.2/gw/sqlbox_sdb.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_sdb.c 2009-06-08 15:05:49.000000000 -0300 @@ -412,6 +412,8 @@ res->sql_leave = sdb_leave; res->sql_fetch_msg = sdb_fetch_msg; res->sql_save_msg = sdb_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } #endif diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sql.h sqlbox-0.7.2_des/gw/sqlbox_sql.h --- sqlbox-0.7.2/gw/sqlbox_sql.h 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_sql.h 2009-06-08 14:26:34.000000000 -0300 @@ -16,6 +16,8 @@ void (*sql_leave) (); Msg *(*sql_fetch_msg) (); void (*sql_save_msg) (Msg *, Octstr *); + long (*sql_fetch_batch) (List *msgs, List *msgids, long limit); + void (*sql_batch_msg_done) (void *id); }; struct sqlbox_db_queries { @@ -41,6 +43,8 @@ #define gw_sql_save_msg sql_type->sql_save_msg #define gw_sql_enter sql_type->sql_enter #define gw_sql_leave sql_type->sql_leave +#define gw_sql_fetch_batch sql_type->sql_fetch_batch +#define gw_sql_batch_msg_done sql_type->sql_batch_msg_done /* Macro to run the queries to create tables */ #define sqlbox_run_query(query, table) \ diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sqlite3.c sqlbox-0.7.2_des/gw/sqlbox_sqlite3.c --- sqlbox-0.7.2/gw/sqlbox_sqlite3.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_sqlite3.c 2009-06-08 15:05:58.000000000 -0300 @@ -314,6 +314,8 @@ res->sql_leave = sqlite3_leave; res->sql_fetch_msg = sqlite3_fetch_msg; res->sql_save_msg = sqlite3_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } #endif diff -Nura -x '.*' sqlbox-0.7.2/gw/sqlbox_sqlite.c sqlbox-0.7.2_des/gw/sqlbox_sqlite.c --- sqlbox-0.7.2/gw/sqlbox_sqlite.c 2009-05-19 12:08:35.000000000 -0300 +++ sqlbox-0.7.2_des/gw/sqlbox_sqlite.c 2009-06-08 15:06:05.000000000 -0300 @@ -322,6 +322,8 @@ res->sql_leave = sqlite_leave; res->sql_fetch_msg = sqlite_fetch_msg; res->sql_save_msg = sqlite_save_msg; + res->sql_fetch_batch = NULL; + res->sql_batch_msg_done = NULL; return res; } #endif