Index: gw/smppbox.c
===================================================================
--- gw/smppbox.c	(revision 48)
+++ gw/smppbox.c	(working copy)
@@ -142,9 +142,9 @@
     time_t	connect_time;
     Counter	*smpp_pdu_counter;
     Octstr	*client_ip;
-    List	*incoming;
+    gw_prioqueue_t	*incoming;	/* from esme to smppbox */
+    gw_prioqueue_t	*outgoing;	/* from bearerbox to smppbox */
     List	*retry;   	/* If sending fails */
-    List	*outgoing;
     Dict	*sent;
     Semaphore	*pending;
     volatile sig_atomic_t alive;
@@ -418,6 +418,16 @@
     return msg;
 }
 
+void smpp_incoming(void *arg)
+{
+	Boxc *box = (Boxc *)arg;
+	Msg *msg;
+
+	while (box->alive && (msg = read_from_box(box->bearerbox_connection, box)) != NULL) {
+		gw_prioqueue_produce(box->outgoing, msg);
+	}
+}
+
 Msg *catenate_msg(List *list, int total)
 {
 	int current = 1, partno = 1, thismsg, max = 0;
@@ -1590,6 +1600,126 @@
  *
 */
 
+static int alt_sms_priority_compare(const void *a, const void *b)
+{
+    int ret;
+    Msg *msg1 = (Msg *)a, *msg2 = (Msg *)b;
+
+    switch (msg_type(msg1)) {
+    case heartbeat:
+	switch (msg_type(msg2)) {
+	case heartbeat:
+	    ret = 0;
+	    break;
+	case admin:
+	    ret = -1;
+	    break;
+	case sms:
+	    ret = -1;
+	    break;
+	case ack:
+	    ret = -1;
+	    break;
+	case wdp_datagram:
+	    ret = 1;
+	    break;
+	}
+	break;
+    case admin:
+	switch (msg_type(msg2)) {
+	case heartbeat:
+	    ret = 1;
+	    break;
+	case admin:
+	    ret = 0;
+	    break;
+	case sms:
+	    ret = -1;
+	    break;
+	case ack:
+	    ret = 1;
+	    break;
+	case wdp_datagram:
+	    ret = 1;
+	    break;
+	}
+	break;
+    case sms:
+	switch (msg_type(msg2)) {
+	case heartbeat:
+	    ret = 1;
+	    break;
+	case admin:
+	    ret = 1;
+	    break;
+	case sms:
+	    ret = sms_priority_compare(a, b);
+	    break;
+	case ack:
+	    ret = 1;
+	    break;
+	case wdp_datagram:
+	    ret = 1;
+	    break;
+	}
+	break;
+    case ack:
+	switch (msg_type(msg2)) {
+	case heartbeat:
+	    ret = 1;
+	    break;
+	case admin:
+	    ret = -1;
+	    break;
+	case sms:
+	    ret = -1;
+	    break;
+	case ack:
+	    if (msg1->ack.time > msg2->ack.time)
+		ret = 1;
+	    else if (msg1->ack.time < msg2->ack.time)
+		ret = -1;
+	    else
+		ret = 0;
+	    break;
+	case wdp_datagram:
+	    ret = 1;
+	    break;
+	}
+	break;
+    case wdp_datagram:
+	switch (msg_type(msg2)) {
+	case heartbeat:
+	    ret = -1;
+	    break;
+	case admin:
+	    ret = -1;
+	    break;
+	case sms:
+	    ret = -1;
+	    break;
+	case ack:
+	    ret = -1;
+	    break;
+	case wdp_datagram:
+	    ret = 0;
+	    break;
+	}
+	break;
+    }
+    return ret;
+}
+
+static int pdu_priority_compare(const void *p1, const void *p2)
+{
+	SMPP_PDU *pdu1 = (SMPP_PDU *)p1;
+	SMPP_PDU *pdu2 = (SMPP_PDU *)p2;
+
+	if (pdu1->u.submit_sm.priority_flag == pdu2->u.submit_sm.priority_flag) return 0;
+	if (pdu1->u.submit_sm.priority_flag > pdu2->u.submit_sm.priority_flag) return 1;
+	return -1;
+}
+
 static Boxc *boxc_create(int fd, Octstr *ip, int ssl)
 {
     Boxc *boxc;
@@ -1620,6 +1750,11 @@
     boxc->dest_addr_ton = smpp_dest_addr_ton;
     boxc->dest_addr_npi = smpp_dest_addr_npi;
 
+    boxc->incoming = gw_prioqueue_create(pdu_priority_compare);
+    gw_prioqueue_add_producer(boxc->incoming);
+    boxc->outgoing = gw_prioqueue_create(alt_sms_priority_compare);
+    gw_prioqueue_add_producer(boxc->outgoing);
+
     boxc->alt_dcs = 0;
     boxc->validityperiod = -1;	
     boxc->priority = 0;
@@ -1651,6 +1786,10 @@
 	    octstr_destroy(boxc->client_ip);
     dict_destroy(boxc->msg_acks);
     dict_destroy(boxc->deliver_acks);
+    gw_prioqueue_remove_producer(boxc->incoming);
+    gw_prioqueue_destroy(boxc->incoming, NULL);
+    gw_prioqueue_remove_producer(boxc->outgoing);
+    gw_prioqueue_destroy(boxc->outgoing, NULL);
     gw_free(boxc);
 }
 
@@ -1699,6 +1838,17 @@
 static void smpp_to_bearerbox(void *arg)
 {
     Boxc *box = arg;
+    SMPP_PDU *pdu;
+
+    while (box->alive && (pdu = gw_prioqueue_consume(box->incoming)) != NULL) {
+	handle_pdu(box->smpp_connection, box, pdu);
+    }
+
+}
+
+void smpp_outgoing(void *arg)
+{
+    Boxc *box = (Boxc *)arg;
     Connection *conn = box->smpp_connection;
     SMPP_PDU *pdu;
     long len;
@@ -1720,13 +1870,10 @@
 			break;
 		case 1:
 			box->last_pdu_received = time(NULL);
-			handle_pdu(conn, box, pdu);
+			gw_prioqueue_produce(box->incoming, pdu);
 			break;
 		}
     }
-#ifdef HAVE_SHUTDOWN_CONNECTION
-    shutdown_connection(box->bearerbox_connection);
-#endif
 }
 
 /* if this login was made as a transmitter, then find the corresponding receiver connection */
@@ -1760,7 +1907,7 @@
 
     while (smppbox_status == SMPP_RUNNING && box->alive) {
 
-	msg = read_from_box(box->bearerbox_connection, box);
+	msg = gw_prioqueue_consume(box->outgoing);
         if (msg == NULL) {
 	    if ((!box->alive) || conn_eof(box->bearerbox_connection)) {
             	/* tell smppbox to die */
@@ -1930,7 +2077,7 @@
 {
     int fd;
     Boxc *newconn;
-    long sender;
+    long sender, inthread, outthread;
     Msg *msg;
 
     fd = (int)arg;
@@ -1958,6 +2105,8 @@
     }
 #endif
 
+    inthread = gwthread_create(smpp_incoming, newconn);
+    outthread = gwthread_create(smpp_outgoing, newconn);
     sender = gwthread_create(smpp_to_bearerbox, newconn);
     if (sender == -1) {
 	    error(0, "Failed to start a new thread, disconnecting client <%s>",
@@ -1967,6 +2116,8 @@
     }
     bearerbox_to_smpp(newconn);
     gwthread_join(sender);
+    gwthread_join(inthread);
+    gwthread_join(outthread);
     gwlist_delete_equal(all_boxes, newconn);
     boxc_destroy(newconn);
 }
