Hi,
attached patch implements outgoing queue limit for bearerbox. Now bearerbox
doesn't have any outgoing queue limit and therefore it's even possible to
OOM bearerbox.
The outgoing queue limit is not strict but try to be so. Strict limit
implementation is possible but will have big performance impact because all
queues SMSCs and global have to be locked.
This implementation does following (if sms-outgoing-queue-limit is set):
1) by routing of mt message ensure that sum of all SMSC queues plus global
is bellow the limit (sms-outgoing-queue-limit). If limit exceeded then
message rejected with ack_failed_tmp.
2) if global queue is not empty than queue limit is splitted to 80% for the
new messages coming from smsbox and 20% for the old messages that coming
from global queue by retry. So we ensure that retry/old messages always
find place in the SMSC queue.
This implementation works very good in production for two years already.
Please review, test and vote.
--
Thanks,
Alex
=== doc/userguide/userguide.xml
==================================================================
--- doc/userguide/userguide.xml (revision 274)
+++ doc/userguide/userguide.xml (local)
@@ -1488,6 +1488,14 @@
queues grow very long).
</entry></row>
+ <row><entry><literal>sms-outgoing-queue-limit</literal></entry>
+ <entry>number of messages</entry>
+ <entry valign="bottom">
+ Set maximum size of outgoing message queue. After number of messages
+ has hit this value, Kannel began to discard them.
+ Value -1 default, means that the queue of infinite length is accepted.
+ </entry></row>
+
<row><entry><literal>white-list-regex</literal></entry>
<entry>POSIX regular expression</entry>
<entry valign="bottom">
=== gw/bb_boxc.c
==================================================================
--- gw/bb_boxc.c (revision 274)
+++ gw/bb_boxc.c (local)
@@ -210,7 +210,7 @@
*/
static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
{
- Msg *mack, *mack_store;
+ Msg *mack;
int rc;
/*
@@ -224,27 +224,24 @@
store_save(msg);
- rc = smsc2_rout(msg);
+ rc = smsc2_rout(msg, 0);
switch(rc) {
- case 1:
+ case SMSCCONN_SUCCESS:
mack->ack.nack = ack_success;
break;
- case 0:
+ case SMSCCONN_QUEUED:
mack->ack.nack = ack_buffered;
break;
- case -1: /* no router at all */
- warning(0, "Message rejected by bearerbox, no router!");
+ case SMSCCONN_FAILED_DISCARDED: /* no router at all */
+ case SMSCCONN_FAILED_QFULL: /* queue full */
+ warning(0, "Message rejected by bearerbox, %s!",
+ (rc == SMSCCONN_FAILED_DISCARDED) ? "no router" : "queue full");
/*
* first create nack for store-file, in order to delete
* message from store-file.
*/
- mack_store = msg_create(ack);
- gw_assert(mack_store != NULL);
- uuid_copy(mack_store->ack.id, msg->sms.id);
- mack_store->ack.time = msg->sms.time;
- mack->ack.nack = mack_store->ack.nack = ack_failed;
- store_save(mack_store);
- msg_destroy(mack_store);
+ store_save_ack(msg, (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed));
+ mack->ack.nack = (rc == SMSCCONN_FAILED_QFULL ? ack_failed_tmp : ack_failed);
/* destroy original message */
msg_destroy(msg);
=== gw/bb_smscconn.c
==================================================================
--- gw/bb_smscconn.c (revision 274)
+++ gw/bb_smscconn.c (local)
@@ -104,6 +104,9 @@
extern List *suspended;
extern List *isolated;
+/* outgoing sms queue control */
+extern long max_outgoing_sms_qlength;
+
/* our own thingies */
static volatile sig_atomic_t smsc_running;
@@ -381,6 +384,7 @@
return SMSCCONN_FAILED_REJECTED;
}
+ /* fix sms type if not set already */
if (sms->sms.sms_type != report_mo)
sms->sms.sms_type = mo;
@@ -402,28 +406,29 @@
* the registered receiver numbers for specific smsbox'es.
* Scope: external routing (to smsbox connections)
*/
- if (route_incoming_to_boxc(copy) == -1) {
- warning(0, "incoming messages queue too long, dropping a message.");
- if (sms->sms.sms_type == report_mo)
- bb_alog_sms(conn, sms, "DROPPED Received DLR");
- else
- bb_alog_sms(conn, sms, "DROPPED Received SMS");
+ rc = route_incoming_to_boxc(copy);
+ }
+
+ if (rc == -1 || (rc != SMSCCONN_SUCCESS && rc != SMSCCONN_QUEUED)) {
+ warning(0, "incoming messages queue too long, dropping a message");
+ if (sms->sms.sms_type == report_mo)
+ bb_alog_sms(conn, sms, "DROPPED Received DLR");
+ else
+ bb_alog_sms(conn, sms, "DROPPED Received SMS");
- /* put nack into store-file */
- store_save_ack(sms, ack_failed);
+ /* put nack into store-file */
+ store_save_ack(sms, ack_failed);
- msg_destroy(copy);
-
- msg_destroy(sms);
- gwthread_sleep(0.1); /* letting the queue go down */
- return SMSCCONN_FAILED_QFULL;
- }
+ msg_destroy(copy);
+ msg_destroy(sms);
+ gwthread_sleep(0.1); /* letting the queue go down */
+ return (rc == -1 ? SMSCCONN_FAILED_QFULL : rc);
}
if (sms->sms.sms_type != report_mo)
bb_alog_sms(conn, sms, "Receive SMS");
else
- bb_alog_sms(conn, sms, "DLR SMS");
+ bb_alog_sms(conn, sms, "Receive DLR");
counter_increase(incoming_sms_counter);
if (conn != NULL) counter_increase(conn->received);
@@ -445,22 +450,20 @@
*/
static void sms_router(void *arg)
{
- Msg *msg, *newmsg, *startmsg;
- int ret;
-
+ Msg *msg, *startmsg, *newmsg;
+ long ret;
+
gwlist_add_producer(flow_threads);
gwthread_wakeup(MAIN_THREAD_ID);
- newmsg = startmsg = NULL;
- ret = 0;
-
+ startmsg = newmsg = NULL;
+ ret = SMSCCONN_SUCCESS;
+
while(bb_status != BB_DEAD) {
+
if (newmsg == startmsg) {
- if (ret != 1) {
- /*
- * In order to reduce amount of msgs to send we sleep only the half of frequency time
- * but at least 1 second.
- */
+ if (ret == SMSCCONN_QUEUED || ret == SMSCCONN_FAILED_QFULL) {
+ /* sleep: sms_resend_frequency / 2 , so we reduce amount of msgs to send */
double sleep_time = (sms_resend_frequency / 2 > 1 ? sms_resend_frequency / 2 : sms_resend_frequency);
debug("bb.sms", 0, "sms_router: time to sleep %.2f secs.", sleep_time);
gwthread_sleep(sleep_time);
@@ -485,18 +488,28 @@
bb_status != BB_SHUTDOWN && bb_status != BB_DEAD) {
debug("bb.sms", 0, "re-queing SMS not-yet-to-be resent");
gwlist_produce(outgoing_sms, msg);
- ret = 0;
+ ret = SMSCCONN_QUEUED;
continue;
}
- ret = smsc2_rout(msg);
- if (ret == -1) {
- warning(0, "No SMSCes to receive message, discarding it!");
- bb_smscconn_send_failed(NULL, msg, SMSCCONN_FAILED_DISCARDED,
- octstr_create("DISCARDED"));
- } else if (ret == 1) {
- newmsg = startmsg = NULL;
- }
+ ret = smsc2_rout(msg, 1);
+ switch(ret) {
+ case SMSCCONN_SUCCESS:
+ debug("bb.sms", 0, "Message routed successfully.");
+ newmsg = startmsg = NULL;
+ break;
+ case SMSCCONN_QUEUED:
+ debug("bb.sms", 0, "Routing failed, re-queued.");
+ break;
+ case SMSCCONN_FAILED_DISCARDED:
+ msg_destroy(msg);
+ newmsg = startmsg = NULL;
+ break;
+ case SMSCCONN_FAILED_QFULL:
+ debug("bb.sms", 0, "Routing failed, re-queuing.");
+ gwlist_produce(outgoing_sms, msg);
+ break;
+ }
}
gwlist_remove_producer(flow_threads);
}
@@ -521,6 +534,7 @@
/* create split sms counter */
split_msg_counter = counter_create();
+ /* create smsc list and rwlock for it */
smsc_list = gwlist_create();
gw_rwlock_init_static(&smsc_list_lock);
@@ -923,64 +937,87 @@
/* function to route outgoing SMS'es
*
- * If finds a good one, puts into it and returns 1
+ * If finds a good one, puts into it and returns SMSCCONN_SUCCESS
* If finds only bad ones, but acceptable, queues and
- * returns 0 (like all acceptable currently disconnected)
- * If cannot find nothing at all, returns -1 and
+ * returns SMSCCONN_QUEUED (like all acceptable currently disconnected)
+ * if message acceptable but queues full returns SMSCCONN_FAILED_QFULL and
+ * message is not destroyed.
+ * If cannot find nothing at all, returns SMSCCONN_FAILED_DISCARDED and
* message is NOT destroyed (otherwise it is)
*/
-int smsc2_rout(Msg *msg)
+long smsc2_rout(Msg *msg, int resend)
{
StatusInfo info;
SMSCConn *conn, *best_preferred, *best_ok;
long bp_load, bo_load;
- int i, s, ret, bad_found;
+ int i, s, ret, bad_found, full_found;
+ long max_queue, queue_length;
char *uf;
- bp_load = bo_load = 0;
+ /* XXX handle ack here? */
+ if (msg_type(msg) != sms) {
+ error(0, "Attempt to route non SMS message through smsc2_rout!");
+ return SMSCCONN_FAILED_DISCARDED;
+ }
- /* XXX handle ack here? */
- if (msg_type(msg) != sms)
- return -1;
-
/* unify prefix of receiver, in case of it has not been
* already done */
uf = unified_prefix ? octstr_get_cstr(unified_prefix) : NULL;
normalize_number(uf, &(msg->sms.receiver));
-
+
/* select in which list to add this
* start - from random SMSCConn, as they are all 'equal'
*/
gw_rwlock_rdlock(&smsc_list_lock);
if (gwlist_len(smsc_list) == 0) {
- warning(0, "No SMSCes to receive message");
+ warning(0, "No SMSCes to receive message");
gw_rwlock_unlock(&smsc_list_lock);
- return -1;
+ return SMSCCONN_FAILED_DISCARDED;
}
+ /*
+ * if global queue not empty then 20% reserved for old msgs
+ * and 80% for new msgs. So we can guarantee that old msgs find
+ * place in the SMSC's queue.
+ */
+ if (max_outgoing_sms_qlength > 0 && gwlist_len(outgoing_sms) > 0) {
+ max_queue = (resend ? max_outgoing_sms_qlength :
+ max_outgoing_sms_qlength * 0.8);
+ }
+ else
+ max_queue = (max_outgoing_sms_qlength > 0 ? max_outgoing_sms_qlength : 1000000);
+
s = gw_rand() % gwlist_len(smsc_list);
best_preferred = best_ok = NULL;
- bad_found = 0;
-
+ bad_found = full_found = 0;
+ bp_load = bo_load = queue_length = 0;
+
conn = NULL;
for (i=0; i < gwlist_len(smsc_list); i++) {
conn = gwlist_get(smsc_list, (i+s) % gwlist_len(smsc_list));
- ret = smscconn_usable(conn,msg);
+ smscconn_info(conn, &info);
+ queue_length += (info.queued > 0 ? info.queued : 0);
+
+ ret = smscconn_usable(conn,msg);
if (ret == -1)
continue;
/* if we already have a preferred one, skip non-preferred */
- if (ret != 1 && best_preferred)
+ if (ret != 1 && best_preferred)
continue;
- smscconn_info(conn, &info);
- /* If connection is not currently answering... */
+ /* If connection is not currently answering ... */
if (info.status != SMSCCONN_ACTIVE) {
bad_found = 1;
continue;
}
+ /* check queue length */
+ if (info.queued > max_queue) {
+ full_found = 1;
+ continue;
+ }
if (ret == 1) { /* preferred */
if (best_preferred == NULL || info.load < bp_load) {
best_preferred = conn;
@@ -993,32 +1030,51 @@
bo_load = info.load;
}
}
+ queue_length += gwlist_len(outgoing_sms);
+ if (max_outgoing_sms_qlength > 0 && !resend &&
+ queue_length > gwlist_len(smsc_list) * max_outgoing_sms_qlength) {
+ gw_rwlock_unlock(&smsc_list_lock);
+ debug("bb.sms", 0, "sum(#queues) limit");
+ return SMSCCONN_FAILED_QFULL;
+ }
if (best_preferred)
ret = smscconn_send(best_preferred, msg);
else if (best_ok)
ret = smscconn_send(best_ok, msg);
else if (bad_found) {
- if (bb_status != BB_SHUTDOWN)
- gwlist_produce(outgoing_sms, msg);
gw_rwlock_unlock(&smsc_list_lock);
- return 0;
+ if (max_outgoing_sms_qlength < 0 || gwlist_len(outgoing_sms) < max_outgoing_sms_qlength) {
+ gwlist_produce(outgoing_sms, msg);
+ return SMSCCONN_QUEUED;
+ }
+ debug("bb.sms", 0, "bad_found queue full");
+ return SMSCCONN_FAILED_QFULL; /* queue full */
}
+ else if (full_found) {
+ gw_rwlock_unlock(&smsc_list_lock);
+ debug("bb.sms", 0, "full_found queue full");
+ return SMSCCONN_FAILED_QFULL;
+ }
else {
gw_rwlock_unlock(&smsc_list_lock);
- if (bb_status == BB_SHUTDOWN)
- return 0;
- warning(0, "Cannot find SMSCConn for message to <%s>, rejected.",
- octstr_get_cstr(msg->sms.receiver));
- return -1;
+ if (bb_status == BB_SHUTDOWN) {
+ msg_destroy(msg);
+ return SMSCCONN_QUEUED;
+ }
+ warning(0, "Cannot find SMSCConn for message to <%s>, rejected.",
+ octstr_get_cstr(msg->sms.receiver));
+ bb_smscconn_send_failed(NULL, msg_duplicate(msg), SMSCCONN_FAILED_DISCARDED, octstr_create("no SMSC"));
+ return SMSCCONN_FAILED_DISCARDED;
}
+
gw_rwlock_unlock(&smsc_list_lock);
/* check the status of sending operation */
if (ret == -1)
- return (smsc2_rout(msg)); /* re-try */
+ return smsc2_rout(msg, resend); /* re-try */
msg_destroy(msg);
- return 1;
+ return SMSCCONN_SUCCESS;
}
@@ -1048,7 +1104,7 @@
msg->sms.sms_type = mt_push;
store_save(msg);
/* drop into outbound queue again for routing */
- return smsc2_rout(msg);
+ return smsc2_rout(msg, 0);
}
if (conn->reroute_to_smsc) {
@@ -1059,7 +1115,7 @@
/* apply directly to the given smsc-id for MT traffic */
octstr_destroy(msg->sms.smsc_id);
msg->sms.smsc_id = octstr_duplicate(conn->reroute_to_smsc);
- return smsc2_rout(msg);
+ return smsc2_rout(msg, 0);
}
if (conn->reroute_by_receiver && msg->sms.receiver &&
@@ -1072,7 +1128,7 @@
/* XXX implement wildcard matching too! */
octstr_destroy(msg->sms.smsc_id);
msg->sms.smsc_id = octstr_duplicate(smsc);
- return smsc2_rout(msg);
+ return smsc2_rout(msg, 0);
}
return -1;
=== gw/bb_smscconn_cb.h
==================================================================
--- gw/bb_smscconn_cb.h (revision 274)
+++ gw/bb_smscconn_cb.h (local)
@@ -106,6 +106,7 @@
enum {
SMSCCONN_SUCCESS = 0,
+ SMSCCONN_QUEUED,
SMSCCONN_FAILED_SHUTDOWN,
SMSCCONN_FAILED_REJECTED,
SMSCCONN_FAILED_MALFORMED,
=== gw/bearerbox.c
==================================================================
--- gw/bearerbox.c (revision 274)
+++ gw/bearerbox.c (local)
@@ -93,7 +93,9 @@
/* incoming/outgoing sms queue control */
long max_incoming_sms_qlength;
+long max_outgoing_sms_qlength;
+
/* this is not a list of items; instead it is used as
* indicator to note how many threads we have.
* ALL flow threads must exit before we may safely change
@@ -179,13 +181,13 @@
case SIGHUP:
bb_todo |= BB_LOGREOPEN;
break;
-
- /*
+
+ /*
* It would be more proper to use SIGUSR1 for this, but on some
- * platforms that's reserved by the pthread support.
+ * platforms that's reserved by the pthread support.
*/
case SIGQUIT:
- bb_todo |= BB_CHECKLEAKS;
+ bb_todo |= BB_CHECKLEAKS;
break;
}
}
@@ -415,7 +417,7 @@
conn_config_ssl (grp);
- /*
+ /*
* Make sure we have "ssl-server-cert-file" and "ssl-server-key-file" specified
* in the core group since we need it to run SSL-enabled internal box
* connections configured via "smsbox-port-ssl = yes" and "wapbox-port-ssl = yes".
@@ -437,9 +439,9 @@
octstr_destroy(ssl_server_cert_file);
octstr_destroy(ssl_server_key_file);
#endif /* HAVE_LIBSSL */
-
+
/* if all seems to be OK by the first glimpse, real start-up */
-
+
outgoing_sms = gwlist_create();
incoming_sms = gwlist_create();
outgoing_wdp = gwlist_create();
@@ -449,7 +451,7 @@
incoming_sms_counter = counter_create();
outgoing_wdp_counter = counter_create();
incoming_wdp_counter = counter_create();
-
+
status_mutex = mutex_create();
setup_signal_handlers();
@@ -469,6 +471,10 @@
cfg_get_integer(&max_incoming_sms_qlength, grp,
octstr_imm("sms-incoming-queue-limit")) == -1)
max_incoming_sms_qlength = -1;
+
+ if (cfg_get_integer(&max_outgoing_sms_qlength, grp,
+ octstr_imm("sms-outgoing-queue-limit")) == -1)
+ max_outgoing_sms_qlength = -1;
#ifndef NO_SMS
{
=== gw/bearerbox.h
==================================================================
--- gw/bearerbox.h (revision 274)
+++ gw/bearerbox.h (local)
@@ -155,13 +155,19 @@
void smsc2_cleanup(void); /* final clean-up */
Octstr *smsc2_status(int status_type);
-/* Route message to SMSC. If finds a good one, puts into it and returns 1
- * If finds only bad ones, but acceptable, queues and returns 0
- * (like all acceptable currently disconnected)
- * If cannot find nothing at all, returns -1 and message is NOT destroyed
- * (otherwise it is) */
-int smsc2_rout(Msg *msg);
+/* function to route outgoing SMS'es
+ *
+ * If finds a good one, puts into it and returns SMSCCONN_SUCCESS
+ * If finds only bad ones, but acceptable, queues and
+ * returns SMSCCONN_QUEUED (like all acceptable currently disconnected)
+ * if message acceptable but queues full returns SMSCCONN_FAILED_QFULL and
+ * message is not destroyed.
+ * If cannot find nothing at all, returns SMSCCONN_FAILED_DISCARDED and
+ * message is NOT destroyed (otherwise it is)
+ */
+long smsc2_rout(Msg *msg, int resend);
+
int smsc2_stop_smsc(Octstr *id); /* shutdown a specific smsc */
int smsc2_restart_smsc(Octstr *id); /* re-start a specific smsc */
=== gwlib/cfg.def
==================================================================
--- gwlib/cfg.def (revision 274)
+++ gwlib/cfg.def (local)
@@ -120,6 +120,7 @@
OCTSTR(dlr-storage)
OCTSTR(maximum-queue-length)
OCTSTR(sms-incoming-queue-limit)
+ OCTSTR(sms-outgoing-queue-limit)
OCTSTR(sms-resend-freq)
OCTSTR(sms-resend-retry)
)