new version with a small bug fix.

Alexander Malysh wrote:

> Hi all,
> 
> attached you can find reworked MO concatenation patch from Paul Bagyenda.
> This patch should address all issues I can imagine of :)
> 
> Please test and vote!
> 

-- 
Thanks,
Alex
=== CVS/Entries
==================================================================
--- CVS/Entries	(revision 288)
+++ CVS/Entries	(local)
@@ -30,4 +30,4 @@
 /configure.in/1.173/Wed Apr 11 17:06:10 2007//
 /Makefile.in/1.80/Sun Apr 22 11:22:43 2007//
 /aclocal.m4/1.10/Sun Apr 22 11:22:43 2007//
-/ChangeLog/1.2887/Wed May  9 09:11:23 2007//
+/ChangeLog/1.2888/Tue May 15 18:13:56 2007//
=== doc/userguide/userguide.xml
==================================================================
--- doc/userguide/userguide.xml	(revision 288)
+++ doc/userguide/userguide.xml	(local)
@@ -1534,6 +1534,22 @@
         </entry>   
      </row>
 
+     <row><entry><literal>sms-combine-concatenated-mo</literal></entry>
+        <entry>boolean</entry>
+        <entry valign="bottom">
+        Whether Kannel should attempt to combine concatenated MO SMS
+        prior to passing them over to smsbox. Default is true
+        </entry>   
+     </row>
+
+     <row><entry><literal>sms-combine-concatenated-mo-timeout</literal></entry>
+        <entry>seconds</entry>
+        <entry valign="bottom">
+        How long to wait for all concatenated message parts to arrive before timeouting.
+        Default 1800 seconds.
+        </entry>   
+     </row>
+
   </tbody>
   </tgroup>
  </table>
=== gw/bb_smscconn.c
==================================================================
--- gw/bb_smscconn.c	(revision 288)
+++ gw/bb_smscconn.c	(local)
@@ -106,6 +106,8 @@
 
 /* outgoing sms queue control */
 extern long max_outgoing_sms_qlength;
+/* incoming sms queue control */
+extern long max_incoming_sms_qlength;
 
 /* our own thingies */
 
@@ -133,11 +135,22 @@
  */
 Counter *split_msg_counter;
 
+/* Flag for handling concatenated incoming messages. */
+static int handle_concatenated_mo;
+/* How long to wait for message parts */
+static long concatenated_mo_timeout;
+/* Flag for return value of check_concat */
+enum {concat_error = -1, concat_complete = 0, concat_pending = 1, concat_none};
+
 /*
  * forward declaration
  */
 static long route_incoming_to_smsc(SMSCConn *conn, Msg *msg);
 
+static void init_concat_handler(void);
+static void shutdown_concat_handler(void);
+static int check_concatenation(Msg **msg, Octstr *smscid);
+static void clear_old_concat_parts(void);
 
 /*---------------------------------------------------------------------------
  * CALLBACK FUNCTIONS
@@ -386,11 +399,13 @@
 
     /* fix sms type if not set already */
     if (sms->sms.sms_type != report_mo)
-	sms->sms.sms_type = mo;
+        sms->sms.sms_type = mo;
 
     /* write to store (if enabled) */
-    if (store_save(sms) == -1)
-	return SMSCCONN_FAILED_TEMPORARILY;
+    if (store_save(sms) == -1) {
+        msg_destroy(sms);
+        return SMSCCONN_FAILED_TEMPORARILY;
+    }
 
     copy = msg_duplicate(sms);
 
@@ -400,6 +415,36 @@
      * Scope: internal routing (to smsc-ids)
      */
     if ((rc = route_incoming_to_smsc(conn, copy)) == -1) {
+        int ret;
+        /* Before routing to some box, do concat handling
+         * and replace copy as such.
+         */
+        if (handle_concatenated_mo && copy->sms.sms_type == mo) {
+            ret = check_concatenation(&copy, conn->id);
+            switch(ret) {
+            case concat_pending:
+                counter_increase(incoming_sms_counter); /* ?? */
+                if (conn != NULL)
+                    counter_increase(conn->received);
+                msg_destroy(sms);
+                return SMSCCONN_SUCCESS;
+            case concat_complete:
+                /* Combined sms received! save new one since it is now combined. */ 
+                msg_destroy(sms);
+                /* Change the sms. */
+                sms = msg_duplicate(copy);
+                break;
+            case concat_error:
+                /* failed to save, go away. */
+                msg_destroy(sms);
+                return SMSCCONN_FAILED_TEMPORARILY;
+            case concat_none:
+                break;
+            default:
+                panic(0, "Internal error: Unhandled concat result.");
+                break;
+            }
+        }
         /*
          * Now try to route the message to a specific smsbox
          * connection based on the existing msg->sms.boxc_id or
@@ -452,14 +497,16 @@
 {
     Msg *msg, *startmsg, *newmsg;
     long ret;
+    time_t concat_mo_check;
 
     gwlist_add_producer(flow_threads);
     gwthread_wakeup(MAIN_THREAD_ID);
 
     startmsg = newmsg = NULL;
     ret = SMSCCONN_SUCCESS;
+    concat_mo_check = time(NULL);
 
-    while(bb_status != BB_DEAD) {
+    while(bb_status != BB_SHUTDOWN && bb_status != BB_DEAD) {
 
 	if (newmsg == startmsg) {
             if (ret == SMSCCONN_QUEUED || ret == SMSCCONN_FAILED_QFULL) {
@@ -469,16 +516,22 @@
                 gwthread_sleep(sleep_time);
                 debug("bb.sms", 0, "sms_router: gwlist_len = %ld", gwlist_len(outgoing_sms));
             }
-            startmsg = msg = gwlist_consume(outgoing_sms);
+            startmsg = msg = gwlist_timed_consume(outgoing_sms, concatenated_mo_timeout);
             newmsg = NULL;
+        } else {
+            newmsg = msg = gwlist_timed_consume(outgoing_sms, concatenated_mo_timeout);
         }
-        else {
-            newmsg = msg = gwlist_consume(outgoing_sms);
+
+        if (difftime(time(NULL), concat_mo_check) > concatenated_mo_timeout) {
+            concat_mo_check = time(NULL);
+            clear_old_concat_parts();
         }
 
-        /* shutdown ? */
-        if (msg == NULL)
-            break;
+        /* shutdown or timeout */
+        if (msg == NULL) {
+            newmsg = startmsg = NULL;
+            continue;
+        }
 
         debug("bb.sms", 0, "sms_router: handling message (%p vs %p)",
                   msg, startmsg);
@@ -577,16 +630,16 @@
     else
         info(0, "SMS resend retry set to %ld.", sms_resend_retry);
 
+    if (cfg_get_bool(&handle_concatenated_mo, grp, octstr_imm("sms-combine-concatenated-mo")) == -1)
+        handle_concatenated_mo = 1; /* default is TRUE. */
+
+    if (cfg_get_integer(&concatenated_mo_timeout, grp, octstr_imm("sms-combine-concatenated-mo-timeout")) == -1)
+        concatenated_mo_timeout = 1800;
+
+    if (handle_concatenated_mo)
+        init_concat_handler();
+
     smsc_groups = cfg_get_multi_group(cfg, octstr_imm("smsc"));
-    /*
-    while(groups && (grp = gwlist_extract_first(groups)) != NULL) {
-        conn = smscconn_create(grp, 1); 
-        if (conn == NULL)
-            panic(0, "Cannot start with SMSC connection failing");
-        
-        gwlist_append(smsc_list, conn);
-    }
-    */
     gwlist_add_producer(smsc_list);
     for (i = 0; i < gwlist_len(smsc_groups) && 
         (grp = gwlist_get(smsc_groups, i)) != NULL; i++) {
@@ -820,6 +873,9 @@
     counter_destroy(split_msg_counter);
     gw_rwlock_destroy(&smsc_list_lock);
 
+    /* Stop concat handling */
+    shutdown_concat_handler();
+
     smsc_running = 0;
 }
 
@@ -1134,3 +1190,297 @@
     return -1; 
 }
 
+
+/*--------------------------------
+ * incoming concatenated messages handling
+ */
+
+typedef struct ConcatMsg {
+    int refnum;
+    int total_parts;
+    int num_parts;
+    time_t trecv;
+    Octstr *key; /* in dict. */
+    int ack;     /* set to the type of ack to send when deleting. */
+    /* array of parts */
+    Msg **parts;
+} ConcatMsg;
+
+static Dict *incoming_concat_msgs;
+static Mutex *concat_lock;
+static Msg *timeouted_message;
+
+static void destroy_concatMsg(void *x)
+{
+    int i;
+    ConcatMsg *msg = x;
+
+    gw_assert(msg);
+    for (i = 0; i < msg->total_parts; i++) {
+        if (msg->parts[i]) {
+            store_save_ack(msg->parts[i], msg->ack);
+            msg_destroy(msg->parts[i]);
+        }
+    }
+    gw_free(msg->parts);
+    octstr_destroy(msg->key);
+    gw_free(msg);
+}
+
+static void init_concat_handler(void)
+{
+    if (incoming_concat_msgs != NULL) /* already initialised? */
+        return;
+    incoming_concat_msgs = dict_create(max_incoming_sms_qlength > 0 ? max_incoming_sms_qlength : 1024, 
+                                       destroy_concatMsg);
+    concat_lock = mutex_create();
+    debug("bb.sms",0,"smsbox MO concatenated message handling enabled");
+}
+
+static void shutdown_concat_handler(void)
+{
+    if (incoming_concat_msgs == NULL)
+        return;
+    dict_destroy(incoming_concat_msgs);
+    mutex_destroy(concat_lock);
+
+    incoming_concat_msgs = NULL;
+    concat_lock = NULL;
+    debug("bb.sms",0,"smsbox MO concatenated message handling cleaned up");
+}
+
+static void clear_old_concat_parts(void)
+{
+    List *keys;
+    Octstr *key;
+
+    /* not initialised, go away */
+    if (incoming_concat_msgs != NULL)
+        return;
+
+    debug("bb.sms.splits", 0, "clear_old_concat_parts called");
+
+    /* Remove any pending messages that are too old. */
+    keys = dict_keys(incoming_concat_msgs);
+    while((key = gwlist_extract_first(keys)) != NULL) {
+        ConcatMsg *x;
+        Msg *msg;
+        int i, destroy = 1;
+
+        mutex_lock(concat_lock);
+        x = dict_get(incoming_concat_msgs, key);
+        octstr_destroy(key);
+        if (x == NULL || difftime(time(NULL), x->trecv) < concatenated_mo_timeout) {
+            mutex_unlock(concat_lock);
+            continue;
+        }
+        dict_remove(incoming_concat_msgs, x->key);
+        mutex_unlock(concat_lock);
+        warning(0, "Time-out waiting for concatenated message '%s'. Send message parts as is.",
+                octstr_get_cstr(x->key));
+        for (i = 0; i < x->total_parts && destroy == 1; i++) {
+            if (x->parts[i] == NULL)
+                continue;
+            msg = msg_duplicate(x->parts[i]);
+            mutex_lock(concat_lock);
+            timeouted_message = msg;
+            mutex_unlock(concat_lock);
+            store_save_ack(x->parts[i], ack_success);
+            switch(bb_smscconn_receive(NULL, msg)) {
+            case SMSCCONN_FAILED_REJECTED:
+            case SMSCCONN_SUCCESS:
+                msg_destroy(x->parts[i]);
+                x->parts[i] = NULL;
+                x->num_parts--;
+                break;
+            case SMSCCONN_FAILED_TEMPORARILY:
+            case SMSCCONN_FAILED_QFULL:
+            default:
+                /* oops put it back into dict and retry on next run */
+                store_save(x->parts[i]);
+                destroy = 0;
+                break;
+            }
+            mutex_lock(concat_lock);
+            timeouted_message = NULL;
+            mutex_unlock(concat_lock);
+        }
+        if (destroy) {
+            destroy_concatMsg(x);
+        } else {
+            ConcatMsg *x1;
+            mutex_lock(concat_lock);
+            x1 = dict_get(incoming_concat_msgs, x->key);
+            if (x1 != NULL) { /* oops we have new part */
+                int i;
+                if (x->total_parts != x1->total_parts) {
+                    /* broken handset, don't know what todo here??
+                     * for now just put old concatMsg into dict with
+                     * another key and it will be cleaned up on next run.
+                     */
+                    octstr_format_append(x->key, " %d", x->total_parts);
+                    dict_put(incoming_concat_msgs, x->key, x);
+                } else {
+                    for (i = 0; i < x->total_parts; i++) {
+                        if (x->parts[i] == NULL)
+                            continue;
+                        if (x1->parts[i] == NULL) {
+                            x1->parts[i] = x->parts[i];
+                            x->parts[i] = NULL;
+                        }
+                    }
+                    destroy_concatMsg(x);
+                }
+            } else {
+                dict_put(incoming_concat_msgs, x->key, x);
+            }
+            mutex_unlock(concat_lock);
+        }
+    }
+    gwlist_destroy(keys, octstr_destroy_item);
+}
+
+/* Checks if message is concatenated. Returns:
+ * - returns concat_complete if no concat parts, or message complete
+ * - returns concat_pending (and sets *pmsg to NULL) if parts pending
+ * - returns concat_error if store_save fails
+ */
+static int check_concatenation(Msg **pmsg, Octstr *smscid)
+{
+    Msg *msg = *pmsg;
+    int l, iel, refnum, pos, c, part, totalparts, i, sixteenbit;
+    Octstr *udh = msg->sms.udhdata, *key;
+    ConcatMsg *cmsg;
+    int ret = concat_complete;
+
+    /* ... module not initialised or there is no UDH. */
+    if (incoming_concat_msgs == NULL || (l = octstr_len(udh)) == 0)
+        return concat_none;
+
+    /* check if this is timeouted message */
+    mutex_lock(concat_lock);
+    if (timeouted_message == msg) {
+        mutex_unlock(concat_lock);
+        return concat_none;
+    }
+    mutex_unlock(concat_lock);
+
+    for (pos = 1, c = -1; pos < l - 1; pos += iel + 2) {
+        iel = octstr_get_char(udh, pos + 1);
+        if ((c = octstr_get_char(udh,pos)) == 0 || c == 8)
+            break;
+    }
+    if (pos >= l)  /* no concat UDH found. */
+        return concat_none;
+
+    /* c = 0 means 8 bit, c = 8 means 16 bit concat info */
+    sixteenbit = (c == 8);
+    refnum = (!sixteenbit) ? octstr_get_char(udh, pos + 2) :
+        (octstr_get_char(udh, pos + 2) << 8) | octstr_get_char(udh, pos + 3);
+    totalparts = octstr_get_char(udh, pos + 3 + sixteenbit);
+    part = octstr_get_char(udh, pos + 4 + sixteenbit);
+
+    if (part < 1 || part > totalparts) {
+        warning(0, "Invalid concatenation UDH [ref = %d] in message from %s!",
+                refnum, octstr_get_cstr(msg->sms.sender));
+        return concat_none;
+    }
+
+    debug("bb.sms.splits", 0, "Got part %d [ref %d, total parts %d] of message from %s. Dump follows:",
+          part, refnum,totalparts, octstr_get_cstr(msg->sms.sender));
+     
+    msg_dump(msg,0);
+     
+    key = octstr_format("%S %S %S %d", msg->sms.sender, msg->sms.receiver, smscid, refnum);
+    mutex_lock(concat_lock);
+    if ((cmsg = dict_get(incoming_concat_msgs, key)) == NULL) {
+        cmsg = gw_malloc(sizeof(*cmsg));
+        cmsg->refnum = refnum;
+        cmsg->total_parts = totalparts;
+        cmsg->num_parts = 0;
+        cmsg->key = octstr_duplicate(key);
+        cmsg->ack = ack_success;
+        cmsg->parts = gw_malloc(totalparts * sizeof(*cmsg->parts));
+        memset(cmsg->parts, 0, cmsg->total_parts * sizeof(*cmsg->parts)); /* clear it. */
+
+        dict_put(incoming_concat_msgs, key, cmsg);
+    }
+    octstr_destroy(key);
+
+    if (totalparts != cmsg->total_parts) {
+        /* totalparts in udh and cmsg not equal assume bad message */
+        error(0, "Totalparts in UDH doesn't match received before, "
+                "total parts <%d>:<%d> part %d, ref %d, from %s, to %s. Discarded!",
+                cmsg->total_parts, totalparts, part, refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+        mutex_unlock(concat_lock);
+        store_save_ack(msg, ack_success);
+        msg_destroy(msg);
+        *pmsg = msg = NULL;
+        return concat_error;
+    }
+
+    /* check if we have seen message part before... */
+    if (cmsg->parts[part - 1] != NULL) {	  
+        warning(0, "Duplicate message part %d, ref %d, from %s, to %s. Discarded!",
+                part, refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+        store_save_ack(msg, ack_success);
+        msg_destroy(msg); 
+        *pmsg = msg = NULL;
+    } else {
+        cmsg->parts[part -1] = msg;
+        cmsg->num_parts++;
+        /* always update receive time so we have it from last part and don't timeout */
+        cmsg->trecv = time(NULL);
+    }
+
+    if (cmsg->num_parts < cmsg->total_parts) {  /* wait for more parts. */
+        *pmsg = msg = NULL;
+        mutex_unlock(concat_lock);
+        return concat_pending;
+    }
+
+    /* we have all the parts: Put them together, mod UDH, return message. */
+    msg = msg_duplicate(cmsg->parts[0]);
+    uuid_generate(msg->sms.id); /* give it a new ID. */
+
+    debug("bb.sms.splits",0,"Received all concatenated message parts from %s, to %s, refnum %d",
+          octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver), refnum);
+
+    for (i = 1; i < cmsg->total_parts; i++)
+        octstr_append(msg->sms.msgdata, cmsg->parts[i]->sms.msgdata);
+
+    /* Attempt to save the new one, if that fails, then reply with fail. */
+    if (store_save(msg) == -1) {	  
+        mutex_unlock(concat_lock);
+        msg_destroy(msg);
+        *pmsg = msg = NULL;
+        return concat_error;
+    } else 
+        *pmsg = msg; /* return the message part. */
+
+    /* Delete it from the queue and from the Dict. */
+    /* Note: dict_put with NULL value delete and destroy value */
+    dict_put(incoming_concat_msgs, cmsg->key, NULL);
+    mutex_unlock(concat_lock);
+
+    /* fix up UDH */
+    udh = msg->sms.udhdata;
+    l = octstr_len(udh);
+    for (pos = 1; pos < l - 1; pos += iel + 2) {
+        iel = octstr_get_char(udh, pos + 1);
+        if ((c = octstr_get_char(udh, pos)) == 0 || c == 8) {
+            octstr_delete(udh, pos, iel + 2);
+
+            if (octstr_len(udh) <= 1) /* no other UDH elements. */
+                octstr_delete(udh, 0, octstr_len(udh));
+            else
+                octstr_set_char(udh, 0, octstr_len(udh) - 1);
+            break;
+        }
+    }
+    debug("bb.sms.splits", 0, "Got full message [ref %d] of message from %s to %s. Dumping: ",
+          refnum, octstr_get_cstr(msg->sms.sender), octstr_get_cstr(msg->sms.receiver));
+    msg_dump(msg,0);
+
+    return ret;
+}
=== gwlib/CVS/Entries
==================================================================
--- gwlib/CVS/Entries	(revision 288)
+++ gwlib/CVS/Entries	(local)
@@ -45,8 +45,6 @@
 /http.c/1.248/Sun Jan 14 19:21:31 2007//
 /http.h/1.70/Sun Jan 14 19:21:31 2007//
 /latin1_to_gsm.h/1.1/Sat Oct  7 15:18:04 2006//
-/list.c/1.45/Sun Jan 14 19:21:31 2007//
-/list.h/1.26/Sun Jan 14 19:21:31 2007//
 /log.h/1.24/Sun Jan 14 19:21:32 2007//
 /md5.c/1.8/Sun Jan 14 19:21:31 2007//
 /md5.h/1.7/Sun Jan 14 19:21:31 2007//
@@ -81,4 +79,6 @@
 /cfg.def/1.128/Wed Mar 21 22:22:14 2007//
 /gwlib.c/1.24/Wed May  9 09:11:24 2007//
 /log.c/1.53/Wed May  9 09:11:17 2007//
+/list.c/1.46/Tue May 15 18:09:17 2007//
+/list.h/1.27/Tue May 15 16:15:00 2007//
 D
=== gwlib/cfg.def
==================================================================
--- gwlib/cfg.def	(revision 288)
+++ gwlib/cfg.def	(local)
@@ -123,6 +123,8 @@
     OCTSTR(sms-outgoing-queue-limit)
     OCTSTR(sms-resend-freq)
     OCTSTR(sms-resend-retry)
+    OCTSTR(sms-combine-concatenated-mo)
+    OCTSTR(sms-combine-concatenated-mo-timeout)
 )
 
 

Reply via email to