new version with a small bug fix.
Alexander Malysh wrote:
> Hi all,
>
> attached you can find reworked MO concatenation patch from Paul Bagyenda.
> This patch should address all issues I can imagine of :)
>
> Please test and vote!
>
--
Thanks,
Alex
=== CVS/Entries
==================================================================
--- CVS/Entries (revision 288)
+++ CVS/Entries (local)
@@ -30,4 +30,4 @@
/configure.in/1.173/Wed Apr 11 17:06:10 2007//
/Makefile.in/1.80/Sun Apr 22 11:22:43 2007//
/aclocal.m4/1.10/Sun Apr 22 11:22:43 2007//
-/ChangeLog/1.2887/Wed May 9 09:11:23 2007//
+/ChangeLog/1.2888/Tue May 15 18:13:56 2007//
=== doc/userguide/userguide.xml
==================================================================
--- doc/userguide/userguide.xml (revision 288)
+++ doc/userguide/userguide.xml (local)
@@ -1534,6 +1534,22 @@
</entry>
</row>
+ <row><entry><literal>sms-combine-concatenated-mo</literal></entry>
+ <entry>boolean</entry>
+ <entry valign="bottom">
+ Whether Kannel should attempt to combine concatenated MO SMS
+ prior to passing them over to smsbox. Default is true
+ </entry>
+ </row>
+
+ <row><entry><literal>sms-combine-concatenated-mo-timeout</literal></entry>
+ <entry>seconds</entry>
+ <entry valign="bottom">
+ How long to wait for all concatenated message parts to arrive before timeouting.
+ Default 1800 seconds.
+ </entry>
+ </row>
+
</tbody>
</tgroup>
</table>
=== gw/bb_smscconn.c
==================================================================
--- gw/bb_smscconn.c (revision 288)
+++ gw/bb_smscconn.c (local)
@@ -106,6 +106,8 @@
/* outgoing sms queue control */
extern long max_outgoing_sms_qlength;
+/* incoming sms queue control */
+extern long max_incoming_sms_qlength;
/* our own thingies */
@@ -133,11 +135,22 @@
*/
Counter *split_msg_counter;
+/* Flag for handling concatenated incoming messages. */
+static int handle_concatenated_mo;
+/* How long to wait for message parts */
+static long concatenated_mo_timeout;
+/* Flag for return value of check_concat */
+enum {concat_error = -1, concat_complete = 0, concat_pending = 1, concat_none};
+
/*
* forward declaration
*/
static long route_incoming_to_smsc(SMSCConn *conn, Msg *msg);
+static void init_concat_handler(void);
+static void shutdown_concat_handler(void);
+static int check_concatenation(Msg **msg, Octstr *smscid);
+static void clear_old_concat_parts(void);
/*---------------------------------------------------------------------------
* CALLBACK FUNCTIONS
@@ -386,11 +399,13 @@
/* fix sms type if not set already */
if (sms->sms.sms_type != report_mo)
- sms->sms.sms_type = mo;
+ sms->sms.sms_type = mo;
/* write to store (if enabled) */
- if (store_save(sms) == -1)
- return SMSCCONN_FAILED_TEMPORARILY;
+ if (store_save(sms) == -1) {
+ msg_destroy(sms);
+ return SMSCCONN_FAILED_TEMPORARILY;
+ }
copy = msg_duplicate(sms);
@@ -400,6 +415,36 @@
* Scope: internal routing (to smsc-ids)
*/
if ((rc = route_incoming_to_smsc(conn, copy)) == -1) {
+ int ret;
+ /* Before routing to some box, do concat handling
+ * and replace copy as such.
+ */
+ if (handle_concatenated_mo && copy->sms.sms_type == mo) {
+ ret = check_concatenation(©, conn->id);
+ switch(ret) {
+ case concat_pending:
+ counter_increase(incoming_sms_counter); /* ?? */
+ if (conn != NULL)
+ counter_increase(conn->received);
+ msg_destroy(sms);
+ return SMSCCONN_SUCCESS;
+ case concat_complete:
+ /* Combined sms received! save new one since it is now combined. */
+ msg_destroy(sms);
+ /* Change the sms. */
+ sms = msg_duplicate(copy);
+ break;
+ case concat_error:
+ /* failed to save, go away. */
+ msg_destroy(sms);
+ return SMSCCONN_FAILED_TEMPORARILY;
+ case concat_none:
+ break;
+ default:
+ panic(0, "Internal error: Unhandled concat result.");
+ break;
+ }
+ }
/*
* Now try to route the message to a specific smsbox
* connection based on the existing msg->sms.boxc_id or
@@ -452,14 +497,16 @@
{
Msg *msg, *startmsg, *newmsg;
long ret;
+ time_t concat_mo_check;
gwlist_add_producer(flow_threads);
gwthread_wakeup(MAIN_THREAD_ID);
startmsg = newmsg = NULL;
ret = SMSCCONN_SUCCESS;
+ concat_mo_check = time(NULL);
- while(bb_status != BB_DEAD) {
+ while(bb_status != BB_SHUTDOWN && bb_status != BB_DEAD) {
if (newmsg == startmsg) {
if (ret == SMSCCONN_QUEUED || ret == SMSCCONN_FAILED_QFULL) {
@@ -469,16 +516,22 @@
gwthread_sleep(sleep_time);
debug("bb.sms", 0, "sms_router: gwlist_len = %ld", gwlist_len(outgoing_sms));
}
- startmsg = msg = gwlist_consume(outgoing_sms);
+ startmsg = msg = gwlist_timed_consume(outgoing_sms, concatenated_mo_timeout);
newmsg = NULL;
+ } else {
+ newmsg = msg = gwlist_timed_consume(outgoing_sms, concatenated_mo_timeout);
}
- else {
- newmsg = msg = gwlist_consume(outgoing_sms);
+
+ if (difftime(time(NULL), concat_mo_check) > concatenated_mo_timeout) {
+ concat_mo_check = time(NULL);
+ clear_old_concat_parts();
}
- /* shutdown ? */
- if (msg == NULL)
- break;
+ /* shutdown or timeout */
+ if (msg == NULL) {
+ newmsg = startmsg = NULL;
+ continue;
+ }
debug("bb.sms", 0, "sms_router: handling message (%p vs %p)",
msg, startmsg);
@@ -577,16 +630,16 @@
else
info(0, "SMS resend retry set to %ld.", sms_resend_retry);
+ if (cfg_get_bool(&handle_concatenated_mo, grp, octstr_imm("sms-combine-concatenated-mo")) == -1)
+ handle_concatenated_mo = 1; /* default is TRUE. */
+
+ if (cfg_get_integer(&concatenated_mo_timeout, grp, octstr_imm("sms-combine-concatenated-mo-timeout")) == -1)
+ concatenated_mo_timeout = 1800;
+
+ if (handle_concatenated_mo)
+ init_concat_handler();
+
smsc_groups = cfg_get_multi_group(cfg, octstr_imm("smsc"));
- /*
- while(groups && (grp = gwlist_extract_first(groups)) != NULL) {
- conn = smscconn_create(grp, 1);
- if (conn == NULL)
- panic(0, "Cannot start with SMSC connection failing");
-
- gwlist_append(smsc_list, conn);
- }
- */
gwlist_add_producer(smsc_list);
for (i = 0; i < gwlist_len(smsc_groups) &&
(grp = gwlist_get(smsc_groups, i)) != NULL; i++) {
@@ -820,6 +873,9 @@
counter_destroy(split_msg_counter);
gw_rwlock_destroy(&smsc_list_lock);
+ /* Stop concat handling */
+ shutdown_concat_handler();
+
smsc_running = 0;
}
@@ -1134,3 +1190,297 @@
return -1;
}
+
+/*--------------------------------
+ * incoming concatenated messages handling
+ */
+
+typedef struct ConcatMsg {
+ int refnum;
+ int total_parts;
+ int num_parts;
+ time_t trecv;
+ Octstr *key; /* in dict. */
+ int ack; /* set to the type of ack to send when deleting. */
+ /* array of parts */
+ Msg **parts;
+} ConcatMsg;
+
+static Dict *incoming_concat_msgs;
+static Mutex *concat_lock;
+static Msg *timeouted_message;
+
+static void destroy_concatMsg(void *x)
+{
+ int i;
+ ConcatMsg *msg = x;
+
+ gw_assert(msg);
+ for (i = 0; i < msg->total_parts; i++) {
+ if (msg->parts[i]) {
+ store_save_ack(msg->parts[i], msg->ack);
+ msg_destroy(msg->parts[i]);
+ }
+ }
+ gw_free(msg->parts);
+ octstr_destroy(msg->key);
+ gw_free(msg);
+}
+
+static void init_concat_handler(void)
+{
+ if (incoming_concat_msgs != NULL) /* already initialised? */
+ return;
+ incoming_concat_msgs = dict_create(max_incoming_sms_qlength > 0 ? max_incoming_sms_qlength : 1024,
+ destroy_concatMsg);
+ concat_lock = mutex_create();
+ debug("bb.sms",0,"smsbox MO concatenated message handling enabled");
+}
+
+static void shutdown_concat_handler(void)
+{
+ if (incoming_concat_msgs == NULL)
+ return;
+ dict_destroy(incoming_concat_msgs);
+ mutex_destroy(concat_lock);
+
+ incoming_concat_msgs = NULL;
+ concat_lock = NULL;
+ debug("bb.sms",0,"smsbox MO concatenated message handling cleaned up");
+}
+
+static void clear_old_concat_parts(void)
+{
+ List *keys;
+ Octstr *key;
+
+ /* not initialised, go away */
+ if (incoming_concat_msgs != NULL)
+ return;
+
+ debug("bb.sms.splits", 0, "clear_old_concat_parts called");
+
+ /* Remove any pending messages that are too old. */
+ keys = dict_keys(incoming_concat_msgs);
+ while((key = gwlist_extract_first(keys)) != NULL) {
+ ConcatMsg *x;
+ Msg *msg;
+ int i, destroy = 1;
+
+ mutex_lock(concat_lock);
+ x = dict_get(incoming_concat_msgs, key);
+ octstr_destroy(key);
+ if (x == NULL || difftime(time(NULL), x->trecv) < concatenated_mo_timeout) {
+ mutex_unlock(concat_lock);
+ continue;
+ }
+ dict_remove(incoming_concat_msgs, x->key);
+ mutex_unlock(concat_lock);
+ warning(0, "Time-out waiting for concatenated message '%s'. Send message parts as is.",
+ octstr_get_cstr(x->key));
+ for (i = 0; i < x->total_parts && destroy == 1; i++) {
+ if (x->parts[i] == NULL)
+ continue;
+ msg = msg_duplicate(x->parts[i]);
+ mutex_lock(concat_lock);
+ timeouted_message = msg;
+ mutex_unlock(concat_lock);
+ store_save_ack(x->parts[i], ack_success);
+ switch(bb_smscconn_receive(NULL, msg)) {
+ case SMSCCONN_FAILED_REJECTED:
+ case SMSCCONN_SUCCESS:
+ msg_destroy(x->parts[i]);
+ x->parts[i] = NULL;
+ x->num_parts--;
+ break;
+ case SMSCCONN_FAILED_TEMPORARILY:
+ case SMSCCONN_FAILED_QFULL:
+ default:
+ /* oops put it back into dict and retry on next run */
+ store_save(x->parts[i]);
+ destroy = 0;
+ break;
+ }
+ mutex_lock(concat_lock);
+ timeouted_message = NULL;
+ mutex_unlock(concat_lock);
+ }
+ if (destroy) {
+ destroy_concatMsg(x);
+ } else {
+ ConcatMsg *x1;
+ mutex_lock(concat_lock);
+ x1 = dict_get(incoming_concat_msgs, x->key);
+ if (x1 != NULL) { /* oops we have new part */
+ int i;
+ if (x->total_parts != x1->total_parts) {
+ /* broken handset, don't know what todo here??
+ * for now just put old concatMsg into dict with
+ * another key and it will be cleaned up on next run.
+ */
+ octstr_format_append(x->key, " %d", x->total_parts);
+ dict_put(incoming_concat_msgs, x->key, x);
+ } else {
+ for (i = 0; i < x->total_parts; i++) {
+ if (x->parts[i] == NULL)
+ continue;
+ if (x1->parts[i] == NULL) {
+ x1->parts[i] = x->parts[i];
+ x->parts[i] = NULL;
+ }
+ }
+ destroy_concatMsg(x);
+ }
+ } else {
+ dict_put(incoming_concat_msgs, x->key, x);
+ }
+ mutex_unlock(concat_lock);
+ }
+ }
+ gwlist_destroy(keys, octstr_destroy_item);
+}
+
+/* Checks if message is concatenated. Returns:
+ * - returns concat_complete if no concat parts, or message complete
+ * - returns concat_pending (and sets *pmsg to NULL) if parts pending
+ * - returns concat_error if store_save fails
+ */
+static int check_concatenation(Msg **pmsg, Octstr *smscid)
+{
+ Msg *msg = *pmsg;
+ int l, iel, refnum, pos, c, part, totalparts, i, sixteenbit;
+ Octstr *udh = msg->sms.udhdata, *key;
+ ConcatMsg *cmsg;
+ int ret = concat_complete;
+
+ /* ... module not initialised or there is no UDH. */
+ if (incoming_concat_msgs == NULL || (l = octstr_len(udh)) == 0)
+ return concat_none;
+
+ /* check if this is timeouted message */
+ mutex_lock(concat_lock);
+ if (timeouted_message == msg) {
+ mutex_unlock(concat_lock);
+ return concat_none;
+ }
+ mutex_unlock(concat_lock);
+
+ for (pos = 1, c = -1; pos < l - 1; pos += iel + 2) {
+ iel = octstr_get_char(udh, pos + 1);
+ if ((c = octstr_get_char(udh,pos)) == 0 || c == 8)
+ break;
+ }
+ if (pos >= l) /* no concat UDH found. */
+ return concat_none;
+
+ /* c = 0 means 8 bit, c = 8 means 16 bit concat info */
+ sixteenbit = (c == 8);
+ refnum = (!sixteenbit) ? octstr_get_char(udh, pos + 2) :
+ (octstr_get_char(udh, pos + 2) << 8) | octstr_get_char(udh, pos + 3);
+ totalparts = octstr_get_char(udh, pos + 3 + sixteenbit);
+ part = octstr_get_char(udh, pos + 4 + sixteenbit);
+
+ if (part < 1 || part > totalparts) {
+ warning(0, "Invalid concatenation UDH [ref = %d] in message from %s!",
+ refnum, octstr_get_cstr(msg->sms.sender));
+ return concat_none;
+ }
+
+ debug("bb.sms.splits", 0, "Got part %d [ref %d, total parts %d] of message from %s. Dump follows:",
+ part, refnum,totalparts, octstr_get_cstr(msg->sms.sender));
+
+ msg_dump(msg,0);
+
+ key = octstr_format("%S %S %S %d", msg->sms.sender, msg->sms.receiver, smscid, refnum);
+ mutex_lock(concat_lock);
+ if ((cmsg = dict_get(incoming_concat_msgs, key)) == NULL) {
+ cmsg = gw_malloc(sizeof(*cmsg));
+ cmsg->refnum = refnum;
+ cmsg->total_parts = totalparts;
+ cmsg->num_parts = 0;
+ cmsg->key = octstr_duplicate(key);
+ cmsg->ack = ack_success;
+ cmsg->parts = gw_malloc(totalparts * sizeof(*cmsg->parts));
+ memset(cmsg->parts, 0, cmsg->total_parts * sizeof(*cmsg->parts)); /* clear it. */
+
+ dict_put(incoming_concat_msgs, key, cmsg);
+ }
+ octstr_destroy(key);
+
+ if (totalparts != cmsg->total_parts) {
+ /* totalparts in udh and cmsg not equal assume bad message */
+ error(0, "Totalparts in UDH doesn't match received before, "
+ "total parts <%d>:<%d> part %d, ref %d, from %s, to %s. Discarded!",
+ cmsg->total_parts, totalparts, part, refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+ mutex_unlock(concat_lock);
+ store_save_ack(msg, ack_success);
+ msg_destroy(msg);
+ *pmsg = msg = NULL;
+ return concat_error;
+ }
+
+ /* check if we have seen message part before... */
+ if (cmsg->parts[part - 1] != NULL) {
+ warning(0, "Duplicate message part %d, ref %d, from %s, to %s. Discarded!",
+ part, refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+ store_save_ack(msg, ack_success);
+ msg_destroy(msg);
+ *pmsg = msg = NULL;
+ } else {
+ cmsg->parts[part -1] = msg;
+ cmsg->num_parts++;
+ /* always update receive time so we have it from last part and don't timeout */
+ cmsg->trecv = time(NULL);
+ }
+
+ if (cmsg->num_parts < cmsg->total_parts) { /* wait for more parts. */
+ *pmsg = msg = NULL;
+ mutex_unlock(concat_lock);
+ return concat_pending;
+ }
+
+ /* we have all the parts: Put them together, mod UDH, return message. */
+ msg = msg_duplicate(cmsg->parts[0]);
+ uuid_generate(msg->sms.id); /* give it a new ID. */
+
+ debug("bb.sms.splits",0,"Received all concatenated message parts from %s, to %s, refnum %d",
+ octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver), refnum);
+
+ for (i = 1; i < cmsg->total_parts; i++)
+ octstr_append(msg->sms.msgdata, cmsg->parts[i]->sms.msgdata);
+
+ /* Attempt to save the new one, if that fails, then reply with fail. */
+ if (store_save(msg) == -1) {
+ mutex_unlock(concat_lock);
+ msg_destroy(msg);
+ *pmsg = msg = NULL;
+ return concat_error;
+ } else
+ *pmsg = msg; /* return the message part. */
+
+ /* Delete it from the queue and from the Dict. */
+ /* Note: dict_put with NULL value delete and destroy value */
+ dict_put(incoming_concat_msgs, cmsg->key, NULL);
+ mutex_unlock(concat_lock);
+
+ /* fix up UDH */
+ udh = msg->sms.udhdata;
+ l = octstr_len(udh);
+ for (pos = 1; pos < l - 1; pos += iel + 2) {
+ iel = octstr_get_char(udh, pos + 1);
+ if ((c = octstr_get_char(udh, pos)) == 0 || c == 8) {
+ octstr_delete(udh, pos, iel + 2);
+
+ if (octstr_len(udh) <= 1) /* no other UDH elements. */
+ octstr_delete(udh, 0, octstr_len(udh));
+ else
+ octstr_set_char(udh, 0, octstr_len(udh) - 1);
+ break;
+ }
+ }
+ debug("bb.sms.splits", 0, "Got full message [ref %d] of message from %s to %s. Dumping: ",
+ refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+ msg_dump(msg,0);
+
+ return ret;
+}
=== gwlib/CVS/Entries
==================================================================
--- gwlib/CVS/Entries (revision 288)
+++ gwlib/CVS/Entries (local)
@@ -45,8 +45,6 @@
/http.c/1.248/Sun Jan 14 19:21:31 2007//
/http.h/1.70/Sun Jan 14 19:21:31 2007//
/latin1_to_gsm.h/1.1/Sat Oct 7 15:18:04 2006//
-/list.c/1.45/Sun Jan 14 19:21:31 2007//
-/list.h/1.26/Sun Jan 14 19:21:31 2007//
/log.h/1.24/Sun Jan 14 19:21:32 2007//
/md5.c/1.8/Sun Jan 14 19:21:31 2007//
/md5.h/1.7/Sun Jan 14 19:21:31 2007//
@@ -81,4 +79,6 @@
/cfg.def/1.128/Wed Mar 21 22:22:14 2007//
/gwlib.c/1.24/Wed May 9 09:11:24 2007//
/log.c/1.53/Wed May 9 09:11:17 2007//
+/list.c/1.46/Tue May 15 18:09:17 2007//
+/list.h/1.27/Tue May 15 16:15:00 2007//
D
=== gwlib/cfg.def
==================================================================
--- gwlib/cfg.def (revision 288)
+++ gwlib/cfg.def (local)
@@ -123,6 +123,8 @@
OCTSTR(sms-outgoing-queue-limit)
OCTSTR(sms-resend-freq)
OCTSTR(sms-resend-retry)
+ OCTSTR(sms-combine-concatenated-mo)
+ OCTSTR(sms-combine-concatenated-mo-timeout)
)