Index: sqlbox-cfg.def
===================================================================
--- sqlbox-cfg.def	(revision 52)
+++ sqlbox-cfg.def	(working copy)
@@ -21,4 +21,6 @@
     OCTSTR(ssl-server-cert-file)
     OCTSTR(ssl-server-key-file)
     OCTSTR(ssl-trusted-ca-file)
+    OCTSTR(limit-per-cycle)
+    OCTSTR(sql-log-table-split)
 )
Index: sqlbox.c
===================================================================
--- sqlbox.c	(revision 52)
+++ sqlbox.c	(working copy)
@@ -85,6 +85,7 @@
 static Octstr *bearerbox_host;
 static int bearerbox_port_ssl = 0;
 static Octstr *global_sender;
+static long limit_per_cycle;
 
 #ifndef HAVE_MSSQL
 #ifndef HAVE_MYSQL
@@ -103,8 +104,15 @@
 #endif
 Octstr *sqlbox_id;
 
-#define SLEEP_BETWEEN_SELECTS 1.0
+/* changed name from SLEEP_BETWEEN_SELECTS to SLEEP_BETWEEN_EMPTY_SELECTS
+ * to reflect what this does.
+ * If needed we can make this configurable from config.
+ */
+#define SLEEP_BETWEEN_EMPTY_SELECTS 1.0
 
+/* this defines how many msg will be fetched from database on each select cycle */
+#define DEFAULT_LIMIT_PER_CYCLE 10
+
 typedef struct _boxc {
     Connection    *smsbox_connection;
     Connection    *bearerbox_connection;
@@ -420,7 +428,6 @@
                 break;
             }
         }
-
         if (msg_type(msg) == heartbeat) {
         // todo
             debug("sqlbox", 0, "bearerbox_to_smsbox: catch an heartbeat - we are alive");
@@ -522,6 +529,7 @@
 {
     Boxc *conn = (Boxc *)arg;
     Msg *msg, *mack;
+    Octstr *sql;
 
     while (sqlbox_status == SQL_RUNNING && conn->alive) {
         msg = read_from_box(conn->bearerbox_connection, conn);
@@ -560,8 +568,11 @@
 	    uuid_copy(mack->ack.id, msg->sms.id);
 	    send_msg(conn->bearerbox_connection, conn, mack);
 	    msg_destroy(mack);
-
         }
+		/* we need the ack here to handle actions on sql-insert-table */
+        else if (msg_type(msg) == ack) {
+            gw_sql_save_msg(msg, octstr_imm("MT"));
+        }
 
         msg_destroy(msg);
     }
@@ -571,6 +582,7 @@
 {
     Boxc *boxc;
     Msg *msg;
+    List *qlist;
 
     boxc = gw_malloc(sizeof(Boxc));
     boxc->bearerbox_connection = connect_to_bearerbox_real(bearerbox_host, bearerbox_port, bearerbox_port_ssl, NULL /* bb_our_host */);
@@ -588,8 +600,12 @@
 
     identify_to_bearerbox(boxc);
 
+    qlist = gwlist_create();
+    gwlist_add_producer(qlist);
+
     while (sqlbox_status == SQL_RUNNING && boxc->alive) {
-        if ((msg = gw_sql_fetch_msg()) != NULL) {
+        if ( gw_sql_fetch_msg(qlist, limit_per_cycle) > 0 ) {
+            while((gwlist_len(qlist)>0) && ((msg = gwlist_consume(qlist)) != NULL )) {
             if (charset_processing(msg) == -1) {
                 error(0, "Could not charset process message, dropping it!");
                 msg_destroy(msg);
@@ -598,28 +614,50 @@
             if (global_sender != NULL && (msg->sms.sender == NULL || octstr_len(msg->sms.sender) == 0)) {
                 msg->sms.sender = octstr_duplicate(global_sender);
             }
-            /* convert validity and deferred to unix timestamp */
+                /* convert validity and deferred to unix timestamp 
+				 * REMEMBER - we have those data in MINUTES in databases
+				 */
             if (msg->sms.validity != SMS_PARAM_UNDEFINED)
                 msg->sms.validity = time(NULL) + msg->sms.validity * 60;
             if (msg->sms.deferred != SMS_PARAM_UNDEFINED)
                 msg->sms.deferred = time(NULL) + msg->sms.deferred * 60;
-            send_msg(boxc->bearerbox_connection, boxc, msg);
 
+				/* send_msg - returns: 0 on success, -1 on failure */
+                if (send_msg(boxc->bearerbox_connection, boxc, msg) == -1) {
+					/* temp error message for now
+					 * TODO - FIX: handle this case also on database x_status
+					 */
+					error(0, "FAILED sql_to_bearerbox:send_msg, dropping it!");
+				}
+
             /* convert validity & deferred back to minutes
              * TODO clarify why we fetched message from DB and then insert it back here???
+                 *
+				 * WE do not need this anymore
+				 * DO NOT DELETE the msg from sql-insert-table
+				 * DO NOT INSERT the msg in sql-log-table
+				 * WE WAIT for ACK from bearerbox (bearerbox_to_sql) to perform those actions
              */
+				/* 
             if (msg->sms.validity != SMS_PARAM_UNDEFINED)
                 msg->sms.validity = (msg->sms.validity - time(NULL))/60;
             if (msg->sms.deferred != SMS_PARAM_UNDEFINED)
                 msg->sms.deferred = (msg->sms.deferred - time(NULL))/60;
             gw_sql_save_msg(msg, octstr_imm("MT"));
+				*/
+				
+				/* I think we need this here - CHECK and CONFIRM ??? */
+				msg_destroy(msg);
         }
-        else {
-            gwthread_sleep(SLEEP_BETWEEN_SELECTS);
+        } else {
+            gwthread_sleep(SLEEP_BETWEEN_EMPTY_SELECTS);
         }
-        msg_destroy(msg);
+		/* Remove from here - CHECK and CONFIRM - mem leak??? */
+        //msg_destroy(msg);
     }
 
+    gwlist_remove_producer(qlist);
+    gwlist_destroy(qlist,NULL); /* is this really needed ??? */
     boxc_destroy(boxc);
 }
 
@@ -745,6 +783,11 @@
 
     if (cfg_get_integer(&sqlbox_port, grp, octstr_imm("smsbox-port")) == -1)
         sqlbox_port = 13005;
+
+    /* setup limit per cycle */
+    if (cfg_get_integer(&limit_per_cycle, grp, octstr_imm("limit-per-cycle")) == -1)
+        limit_per_cycle = DEFAULT_LIMIT_PER_CYCLE;
+
     /* setup logfile stuff */
     logfile = cfg_get(grp, octstr_imm("log-file"));
 
Index: sqlbox_mysql.c
===================================================================
--- sqlbox_mysql.c	(revision 52)
+++ sqlbox_mysql.c	(working copy)
@@ -10,6 +10,7 @@
 
 static Octstr *sqlbox_logtable;
 static Octstr *sqlbox_insert_table;
+static int sqlbox_logtable_split = 0;
 
 /*
  * Our connection pool to mysql.
@@ -17,9 +18,10 @@
 
 static DBPool *pool = NULL;
 
-static void mysql_update(const Octstr *sql)
+/* Returns 0 if mysql_query FAILS - 1 otherwise */
+static int mysql_update(const Octstr *sql)
 {
-    int state;
+    int state = 0;
     DBPoolConn *pc;
 
 #if defined(SQLBOX_TRACE)
@@ -29,14 +31,21 @@
     pc = dbpool_conn_consume(pool);
     if (pc == NULL) {
         error(0, "MYSQL: Database pool got no connection! DB update failed!");
-        return;
+        return state;
     }
 
     state = mysql_query(pc->conn, octstr_get_cstr(sql));
-    if (state != 0)
+    if (state != 0) {
         error(0, "MYSQL: %s", mysql_error(pc->conn));
+        state = 0;
+    }
+    else {
+        state = 1;
+    }
 
     dbpool_conn_produce(pc);
+
+    return state;
 }
 
 static MYSQL_RES* mysql_select(const Octstr *sql)
@@ -83,11 +92,26 @@
     if (sqlbox_insert_table == NULL) {
         panic(0, "No 'sql-insert-table' not configured.");
     }
-
-    /* create send_sms && sent_sms tables if they do not exist */
+    cfg_get_bool(&sqlbox_logtable_split, grp, octstr_imm("sql-log-table-split"));
+    if (sqlbox_logtable_split < 0)
+        sqlbox_logtable_split = 0;    
+    /* create send_sms && sent_sms[MO MT DLR] tables if they do not exist */
+    if (sqlbox_logtable_split) {
+        sql = octstr_format(SQLBOX_MYSQL_CREATE_LOG_TABLE, octstr_cat(sqlbox_logtable, octstr_imm("MO")));
+        sql_update(sql);
+        octstr_destroy(sql);
+        sql = octstr_format(SQLBOX_MYSQL_CREATE_LOG_TABLE, octstr_cat(sqlbox_logtable, octstr_imm("MT")));
+        sql_update(sql);
+        octstr_destroy(sql);
+        sql = octstr_format(SQLBOX_MYSQL_CREATE_LOG_TABLE, octstr_cat(sqlbox_logtable, octstr_imm("DLR")));
+        sql_update(sql);
+        octstr_destroy(sql);
+    }
+    else {
     sql = octstr_format(SQLBOX_MYSQL_CREATE_LOG_TABLE, sqlbox_logtable);
     sql_update(sql);
     octstr_destroy(sql);
+    }
     sql = octstr_format(SQLBOX_MYSQL_CREATE_INSERT_TABLE, sqlbox_insert_table);
     sql_update(sql);
     octstr_destroy(sql);
@@ -96,14 +120,19 @@
 
 #define octstr_null_create(x) ((x != NULL) ? octstr_create(x) : octstr_create(""))
 #define atol_null(x) ((x != NULL) ? atol(x) : -1)
-Msg *mysql_fetch_msg()
+int mysql_fetch_msg(List *qlist, long limit)
 {
     Msg *msg = NULL;
-    Octstr *sql, *delet, *id;
     MYSQL_RES *res;
     MYSQL_ROW row;
+    Octstr *sql;
+    Octstr *x_update;
+    Octstr *stored_local_uuid;
+    Octstr *id;
+    char local_uuid[UUID_STR_LEN + 1];
+    int ret = 0;
 
-    sql = octstr_format(SQLBOX_MYSQL_SELECT_QUERY, sqlbox_insert_table);
+    sql = octstr_format(SQLBOX_MYSQL_SELECT_QUERY, sqlbox_insert_table, limit);
 #if defined(SQLBOX_TRACE)
      debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(sql));
 #endif
@@ -112,8 +141,10 @@
         debug("sqlbox", 0, "SQL statement failed: %s", octstr_get_cstr(sql));
     }
     else {
-        if (mysql_num_rows(res) >= 1) {
-            row = mysql_fetch_row(res);
+        ret = mysql_num_rows(res);
+        if (ret >= 1) {
+			//debug("sqlbox", 0, "Rows found: %i", ret);
+            while (row = mysql_fetch_row(res)) {
             id = octstr_null_create(row[0]);
             /* save fields in this row as msg struct */
             msg = msg_create(sms);
@@ -147,19 +178,32 @@
             else {
                 msg->sms.boxc_id= octstr_null_create(row[24]);
             }
-            /* delete current row */
-            delet = octstr_format(SQLBOX_MYSQL_DELETE_QUERY, sqlbox_insert_table, id);
+                /* we need sms.id for later ack - match from bearerbox (bearerbox_to_sql) */
+                if (!uuid_is_null(msg->sms.id)) {
+                    uuid_unparse(msg->sms.id, local_uuid);
+                    stored_local_uuid = octstr_create(local_uuid);
+                    x_update = octstr_format(X_SQLBOX_MYSQL_UPDATE_ID_QUERY, sqlbox_insert_table, stored_local_uuid, id);
 #if defined(SQLBOX_TRACE)
-            debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(delet));
+                    debug("SQLBOX", 0, "sql: %s", octstr_get_cstr(x_update));
 #endif
-            mysql_update(delet);
+                    /* insert msg into qlist only if update succeed */
+                    if (sql_update(x_update)) {
+                        gwlist_produce(qlist, msg);
+                    }
+                    /* we could use "else { ret--; }" here */
+					//octstr_destroy(stored_local_uuid);
+					//octstr_destroy(x_update);
+                }
+                /*  we could use "else { ret--; }" also here */
             octstr_destroy(id);
-            octstr_destroy(delet);
+                octstr_destroy(stored_local_uuid);
+                octstr_destroy(x_update);
         }
+        }
         mysql_free_result(res);
     }
     octstr_destroy(sql);
-    return msg;
+    return ret;
 }
 
 static Octstr *get_numeric_value_or_return_null(long int num)
@@ -186,15 +230,31 @@
 #define st_num(x) (stuffer[stuffcount++] = get_numeric_value_or_return_null(x))
 #define st_str(x) (stuffer[stuffcount++] = get_string_value_or_return_null(x))
 
-void mysql_save_msg(Msg *msg, Octstr *momt /*, Octstr smsbox_id */)
+void mysql_save_msg(Msg *msg, Octstr *momt)
 {
-    Octstr *sql;
+    Octstr *sql, *sqld;
     Octstr *stuffer[30];
     int stuffcount = 0;
 
-    sql = octstr_format(SQLBOX_MYSQL_INSERT_QUERY, sqlbox_logtable, st_str(momt), st_str(msg->sms.sender),
+    Octstr *sqlbox_logtable_tmp = NULL;
+    char local_uuid[UUID_STR_LEN + 1];
+    Octstr *stored_local_uuid = NULL;
+
+    if (sqlbox_logtable_split) {
+        sqlbox_logtable_tmp = octstr_cat(sqlbox_logtable, momt);
+    }
+    else {
+        sqlbox_logtable_tmp = octstr_duplicate(sqlbox_logtable);
+    }
+/* SMS Type */
+    if (msg_type(msg) == sms) {
+        if (!uuid_is_null(msg->sms.id)) {
+            uuid_unparse(msg->sms.id, local_uuid);
+            stored_local_uuid = octstr_create(local_uuid);
+        }
+        sql = octstr_format(SQLBOX_MYSQL_INSERT_QUERY, sqlbox_logtable_tmp, st_str(momt), st_str(msg->sms.sender),
         st_str(msg->sms.receiver), st_str(msg->sms.udhdata), st_str(msg->sms.msgdata), st_num(msg->sms.time),
-        st_str(msg->sms.smsc_id), st_str(msg->sms.service), st_str(msg->sms.account), st_num(msg->sms.sms_type),
+            st_str(msg->sms.smsc_id), st_str(msg->sms.service), st_str(msg->sms.account), st_str(stored_local_uuid), st_num(msg->sms.sms_type),
         st_num(msg->sms.mclass), st_num(msg->sms.mwi), st_num(msg->sms.coding), st_num(msg->sms.compress),
         st_num(msg->sms.validity), st_num(msg->sms.deferred), st_num(msg->sms.dlr_mask), st_str(msg->sms.dlr_url),
         st_num(msg->sms.pid), st_num(msg->sms.alt_dcs), st_num(msg->sms.rpi), st_str(msg->sms.charset),
@@ -203,7 +263,49 @@
     while (stuffcount > 0) {
         octstr_destroy(stuffer[--stuffcount]);
     }
+    }
+/* ACK Type */
+    else if ( msg_type(msg) == ack /* && momt == octstr_imm("MT") */  && !uuid_is_null(msg->ack.id) ) {
+		uuid_unparse(msg->ack.id, local_uuid);
+		stored_local_uuid = octstr_create(local_uuid);
+		//debug("sqlbox", 0, "mysql_save_msg:ACK received. %i - %s", msg->ack.nack,octstr_get_cstr(stored_local_uuid));
+        switch (msg->ack.nack) {
+          /* 0: Accepted for delivery - HTTP_ACCEPTED - delete msg from sqlbox_insert_table and insert into sqlbox_logtable with x_status=10 */
+          case ack_success:
+            sql = octstr_format(X_SQLBOX_MYSQL_MOVE_QUERY, sqlbox_logtable_tmp, octstr_imm("10"), sqlbox_insert_table, stored_local_uuid);
+            sqld= octstr_format(X_SQLBOX_MYSQL_DELETE_QUERY, sqlbox_insert_table, stored_local_uuid);
+            sql_update(sql);
+            sql_update(sqld);
+            break;
+          /* 3: Queued for later delivery - HTTP_ACCEPTED - delete msg from sqlbox_insert_table and insert into sqlbox_logtable with x_status=13 */
+          case ack_buffered:
+            sql = octstr_format(X_SQLBOX_MYSQL_MOVE_QUERY, sqlbox_logtable_tmp, octstr_imm("13"), sqlbox_insert_table, stored_local_uuid);
+            sqld= octstr_format(X_SQLBOX_MYSQL_DELETE_QUERY, sqlbox_insert_table, stored_local_uuid);
+            sql_update(sql);
+            sql_update(sqld);
+            break;
+          /* 1: Not routable. Do not try again. - HTTP_FORBIDDEN - leave msg on sqlbox_insert_table and update x_status=11 */
+          case ack_failed:
+            sql = octstr_format(X_SQLBOX_MYSQL_UPDATE_STATUS_QUERY, sqlbox_insert_table, octstr_imm("11"), stored_local_uuid);
+            sql_update(sql);
+            break;
+          /* 2: Temporal failure, try again later. - HTTP_SERVICE_UNAVAILABLE - leave msg on sqlbox_insert_table and update x_status=12 (OR??? set x_status=1 to resend) */
+          case ack_failed_tmp:
+            sql = octstr_format(X_SQLBOX_MYSQL_UPDATE_STATUS_QUERY, sqlbox_insert_table, octstr_imm("12"), stored_local_uuid);
+            sql_update(sql);
+            break;
+          /* Strange reply from bearerbox! - HTTP_SERVICE_UNAVAILABLE - leave msg on sqlbox_insert_table and update x_status=14 */
+          default:
+            error(0, "Strange reply from bearerbox!");
+            sql = octstr_format(X_SQLBOX_MYSQL_UPDATE_STATUS_QUERY, sqlbox_insert_table, octstr_imm("14"), stored_local_uuid);
+            sql_update(sql);
+            break;
+        }    
+    }
     octstr_destroy(sql);
+	octstr_destroy(sqld);
+    octstr_destroy(sqlbox_logtable_tmp);
+    octstr_destroy(stored_local_uuid);
 }
 
 void mysql_leave()
Index: sqlbox_mysql.h
===================================================================
--- sqlbox_mysql.h	(revision 52)
+++ sqlbox_mysql.h	(working copy)
@@ -7,46 +7,58 @@
 momt ENUM('MO', 'MT', 'DLR') NULL, sender VARCHAR(20) NULL, \
 receiver VARCHAR(20) NULL, udhdata BLOB NULL, msgdata TEXT NULL, \
 time BIGINT(20) NULL, smsc_id VARCHAR(255) NULL, service VARCHAR(255) NULL, \
-account VARCHAR(255) NULL, id BIGINT(20) NULL, sms_type BIGINT(20) NULL, \
+account VARCHAR(255) NULL, id VARCHAR(40) NULL, sms_type BIGINT(20) NULL, \
 mclass BIGINT(20) NULL, mwi BIGINT(20) NULL, coding BIGINT(20) NULL, \
 compress BIGINT(20) NULL, validity BIGINT(20) NULL, deferred BIGINT(20) NULL, \
 dlr_mask BIGINT(20) NULL, dlr_url VARCHAR(255) NULL, pid BIGINT(20) NULL, \
 alt_dcs BIGINT(20) NULL, rpi BIGINT(20) NULL, charset VARCHAR(255) NULL, \
-boxc_id VARCHAR(255) NULL, binfo VARCHAR(255) NULL, meta_data TEXT)"
+boxc_id VARCHAR(255) NULL, binfo VARCHAR(255) NULL, meta_data TEXT, x_status tinyint(4) NULL)"
 
 #define SQLBOX_MYSQL_CREATE_INSERT_TABLE "CREATE TABLE IF NOT EXISTS %S ( \
 sql_id BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY, \
 momt ENUM('MO', 'MT') NULL, sender VARCHAR(20) NULL, \
 receiver VARCHAR(20) NULL, udhdata BLOB NULL, msgdata TEXT NULL, \
 time BIGINT(20) NULL, smsc_id VARCHAR(255) NULL, service VARCHAR(255) NULL, \
-account VARCHAR(255) NULL, id BIGINT(20) NULL, sms_type BIGINT(20) NULL, \
+account VARCHAR(255) NULL, id VARCHAR(40) NOT NULL, sms_type BIGINT(20) NULL, \
 mclass BIGINT(20) NULL, mwi BIGINT(20) NULL, coding BIGINT(20) NULL, \
 compress BIGINT(20) NULL, validity BIGINT(20) NULL, deferred BIGINT(20) NULL, \
 dlr_mask BIGINT(20) NULL, dlr_url VARCHAR(255) NULL, pid BIGINT(20) NULL, \
 alt_dcs BIGINT(20) NULL, rpi BIGINT(20) NULL, charset VARCHAR(255) NULL, \
-boxc_id VARCHAR(255) NULL, binfo VARCHAR(255) NULL, meta_data TEXT)"
+boxc_id VARCHAR(255) NULL, binfo VARCHAR(255) NULL, meta_data TEXT, x_status tinyint(4) NOT NULL DEFAULT '1')"
 
 #define SQLBOX_MYSQL_SELECT_QUERY "SELECT sql_id, momt, sender, receiver, udhdata, \
 msgdata, time, smsc_id, service, account, id, sms_type, mclass, mwi, coding, \
 compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \
-charset, boxc_id, binfo, meta_data FROM %S LIMIT 0,1"
+charset, boxc_id, binfo, meta_data FROM %S WHERE x_status = 1 LIMIT 0,%i"
 
 #define SQLBOX_MYSQL_INSERT_QUERY "INSERT INTO %S ( sql_id, momt, sender, \
-receiver, udhdata, msgdata, time, smsc_id, service, account, sms_type, \
+receiver, udhdata, msgdata, time, smsc_id, service, account, id, sms_type, \
 mclass, mwi, coding, compress, validity, deferred, dlr_mask, dlr_url, \
 pid, alt_dcs, rpi, charset, boxc_id, binfo, meta_data ) VALUES ( \
-NULL, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, \
+NULL, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, %S, \
 %S, %S, %S, %S, %S, %S, %S, %S, %S)"
 
 #define SQLBOX_MYSQL_DELETE_QUERY "DELETE FROM %S WHERE sql_id = %S"
 
+#define X_SQLBOX_MYSQL_UPDATE_ID_QUERY "UPDATE %S SET id = '%S', x_status = 2 WHERE sql_id = %S"
+
+#define X_SQLBOX_MYSQL_UPDATE_STATUS_QUERY "UPDATE %S SET x_status = %S WHERE id = '%S'"
+
+#define X_SQLBOX_MYSQL_DELETE_QUERY "DELETE FROM %S WHERE id = '%S'"
+
+#define X_SQLBOX_MYSQL_MOVE_QUERY "INSERT INTO %S (SELECT sql_id, momt, sender, receiver, udhdata, \
+msgdata, time, smsc_id, service, account, id, sms_type, mclass, mwi, coding, \
+compress, validity, deferred, dlr_mask, dlr_url, pid, alt_dcs, rpi, \
+charset, boxc_id, binfo, meta_data, %S FROM %S WHERE id = '%S' LIMIT 0,1)"
+
+
 #endif /* HAVE_MYSQL || HAVE_SDB */
 
 #ifdef HAVE_MYSQL
 #include "gw/msg.h"
 #include "sqlbox_sql.h"
 void sql_save_msg(Msg *msg, Octstr *momt);
-Msg *mysql_fetch_msg();
+int mysql_fetch_msg(List *list, long limit);
 void sql_shutdown();
 struct server_type *sqlbox_init_mysql(Cfg* cfg);
 #ifndef sqlbox_mysql_c
Index: sqlbox_sql.h
===================================================================
--- sqlbox_sql.h	(revision 52)
+++ sqlbox_sql.h	(working copy)
@@ -14,7 +14,7 @@
     Octstr *type;
     void (*sql_enter) (Cfg *);
     void (*sql_leave) ();
-    Msg *(*sql_fetch_msg) ();
+    int  (*sql_fetch_msg) (List *, long);
     void (*sql_save_msg) (Msg *, Octstr *);
 };
 
