Well, I missed the obvious implementation, done by fred
(msg_duplicate). Instead, I modified lots of parts in the code.
Besides, I have some other patches applied (which I have sent already,
namely the patch for "bulk retry"). The diff -ub against CVS HEAD is
attached.
I really think fred's the way to go, I will modify my source using
msg_duplicate as soon as I have the time. I think I will also send
separate diffs for bulk retry and ack-after-sent at that time.
Cheers,
On 12/13/05, Hillel <[EMAIL PROTECTED]> wrote:
> Dear Fred and Juan,
>
> I read your posts with interest to the devel list where Fred wrote:
>
> >
> > I tested it with a smpp simulator (logica) probably done 1mill+ mesgs
> > thru it already......
> > killed smsbox with big queue of messages, and all recovers on restarting
> > smsbox!!!
>
> Please will both/either of you send your code to the Kannel devel list so we
> can all test by installing your code, as it's very important to know if you
> kill the smsbox you will not lose any SMSs. It will be great if one of your
> approaches to resolve this issue will be added to CVS.
>
> Rgds
>
>
>
--
Juan
diff -r -ub gateway/gw/smsbox.c ../src-to-diff/gw/smsbox.c
--- gateway/gw/smsbox.c Thu Dec 8 23:14:31 2005
+++ ../src-to-diff/gw/smsbox.c Fri Dec 16 18:38:26 2005
@@ -66,6 +66,8 @@
#include "gwlib/gwlib.h"
#include "gwlib/regex.h"
+#include "gwlib/gw_uuid_types.h"
+#include "gwlib/gw_uuid.h"
#include "msg.h"
#include "sms.h"
#include "dlr.h"
@@ -78,7 +80,6 @@
#include "ota_prov.h"
#include "ota_compiler.h"
#include "xml_shared.h"
-
#ifdef HAVE_SECURITY_PAM_APPL_H
#include <security/pam_appl.h>
#endif
@@ -91,6 +92,7 @@
/* Defaults for the HTTP request queueing inside http_queue_thread */
#define HTTP_MAX_RETRIES 0
#define HTTP_RETRY_DELAY 10 /* in sec. */
+#define BULK_RETRY_Q 10 /* How many should we move to incoming */
/* have we received restart cmd from bearerbox? */
volatile sig_atomic_t restart = 0;
@@ -130,6 +132,8 @@
int charset_processing (Octstr *charset, Octstr *text, int coding);
static long get_tag(Octstr *body, Octstr *tag, Octstr **value, long pos, int nostrip);
+static long bulk_retry_q = BULK_RETRY_Q;
+
/* for delayed HTTP answers.
* Dict key is uuid, value is HTTPClient pointer
* of open transaction
@@ -143,7 +147,21 @@
* Communication with the bearerbox.
*/
+static void send_ack(uuid_t uuid, int ack_type, Msg *msg){
+ Octstr *os;
+ char my_id[UUID_STR_LEN + 1];
+ Msg *mack;
+ uuid_unparse(uuid, my_id);
+ os = octstr_create(my_id);
+ debug("smsbox.c", 0, "Sending ack id = %s", octstr_get_cstr(os));
+ octstr_destroy(os);
+ mack = msg_create(ack);
+ mack->ack.nack = ack_type;
+ mack->ack.time = msg->sms.time;
+ uuid_copy(mack->ack.id,uuid);
+ write_to_bearerbox(mack); /* implicit msg_destroy */
+}
/*
* Identify ourself to bearerbox for smsbox-specific routing inside bearerbox.
* Do this even while no smsbox-id is given to unlock the sender thread in
@@ -403,6 +421,7 @@
List *http_headers;
Octstr *body; /* body content of the request */
unsigned long retries; /* number of performed retries */
+ uuid_t uuid;
};
/*
@@ -410,13 +429,12 @@
*/
static void *remember_receiver(Msg *msg, URLTranslation *trans, int method,
Octstr *url, List *headers, Octstr *body,
- unsigned int retries)
+ unsigned int retries, uuid_t uuid)
{
struct receiver *receiver;
-
counter_increase(num_outstanding_requests);
receiver = gw_malloc(sizeof(*receiver));
-
+ uuid_copy(receiver->uuid, uuid);
receiver->msg = msg_create(sms);
receiver->msg->sms.sender = octstr_duplicate(msg->sms.sender);
@@ -447,7 +465,7 @@
static void get_receiver(void *id, Msg **msg, URLTranslation **trans, int *method,
Octstr **url, List **headers, Octstr **body,
- unsigned long *retries)
+ unsigned long *retries, uuid_t uuid)
{
struct receiver *receiver;
@@ -459,6 +477,7 @@
*headers = receiver->http_headers;
*body = receiver->body;
*retries = receiver->retries;
+ uuid_copy(uuid, receiver->uuid);
gw_free(receiver);
counter_decrease(num_outstanding_requests);
}
@@ -1040,6 +1059,8 @@
Octstr *req_body;
unsigned long retries;
int method;
+ long bulk_i;
+ uuid_t uuid;
while ((id = gwlist_consume(smsbox_http_requests)) != NULL) {
/*
@@ -1052,28 +1073,35 @@
debug("sms.http",0,"HTTP: Queue contains %ld outstanding requests",
gwlist_len(smsbox_http_requests));
+ debug("sms.http",0,"HTTP: Incoming queue contains %ld outstanding requests", gwlist_len(smsbox_requests));
+ bulk_i = 0;
+ do{
/*
* Get all required HTTP request data from the queue and reconstruct
* the id pointer for later lookup in url_result_thread.
*/
- get_receiver(id, &msg, &trans, &method, &req_url, &req_headers, &req_body, &retries);
+ get_receiver(id, &msg, &trans, &method, &req_url, &req_headers, &req_body, &retries, uuid);
if (retries < max_http_retries) {
- id = remember_receiver(msg, trans, method, req_url, req_headers, req_body, ++retries);
+ id = remember_receiver(msg, trans, method, req_url, req_headers, req_body, ++retries, uuid);
- debug("sms.http",0,"HTTP: Retrying request <%s> (%ld/%ld)",
+ info(0,"HTTP: Retrying request <%s> (%ld/%ld)",
octstr_get_cstr(req_url), retries, max_http_retries);
/* re-queue this request to the HTTPCaller list */
http_start_request(caller, method, req_url, req_headers, req_body,
1, id, NULL);
+ }else{
+ warning(0, "Max tries (%d) on http_queue_thread", max_http_retries);
+ send_ack(uuid, ack_failed, msg);
}
msg_destroy(msg);
octstr_destroy(req_url);
http_destroy_headers(req_headers);
octstr_destroy(req_body);
+ }while((++bulk_i < bulk_retry_q) && ((id = gwlist_consume(smsbox_http_requests)) != NULL));
}
}
@@ -1093,11 +1121,11 @@
int octets;
unsigned long retries;
unsigned int queued; /* indicate if processes reply is requeued */
-
Octstr *reply_body, *charset;
Octstr *udh, *from, *to, *dlr_url, *account, *smsc, *binfo;
int dlr_mask, mclass, mwi, coding, compress, pid, alt_dcs, rpi;
int validity, deferred, priority;
+ uuid_t uuid;
text_html = octstr_imm("text/html");
text_wml = octstr_imm("text/vnd.wap.wml");
@@ -1117,9 +1145,11 @@
mclass = mwi = coding = compress = pid = alt_dcs = rpi = dlr_mask
= validity = deferred = priority = SMS_PARAM_UNDEFINED;
- get_receiver(id, &msg, &trans, &method, &req_url, &req_headers, &req_body, &retries);
+ get_receiver(id, &msg, &trans, &method, &req_url, &req_headers, &req_body, &retries, uuid);
if (status == HTTP_OK || status == HTTP_ACCEPTED) {
+ send_ack(uuid, ack_success, msg);
+
http_header_get_content_type(reply_headers, &type, &charset);
if (octstr_case_compare(type, text_html) == 0 ||
octstr_case_compare(type, text_wml) == 0) {
@@ -1173,12 +1203,14 @@
}
octstr_destroy(type);
} else if (max_http_retries > retries) {
- id = remember_receiver(msg, trans, method, req_url, req_headers, req_body, retries);
+ id = remember_receiver(msg, trans, method, req_url, req_headers, req_body, retries, uuid);
gwlist_produce(smsbox_http_requests, id);
queued++;
goto requeued;
- } else
+ } else{
replytext = octstr_duplicate(reply_couldnotfetch);
+ send_ack(uuid, ack_failed, msg);
+ }
fill_message(msg, trans, replytext, octets, from, to, udh, mclass,
mwi, coding, compress, validity, deferred, dlr_url,
@@ -1304,7 +1336,9 @@
}
}
- id = remember_receiver(msg, trans, HTTP_METHOD_GET, pattern, request_headers, NULL, 0);
+ id = remember_receiver(msg, trans, HTTP_METHOD_GET, pattern, request_headers, NULL, 0, msg->sms.id);
+ requests_started++;
+
http_start_request(caller, HTTP_METHOD_GET, pattern, request_headers,
NULL, 1, id, NULL);
octstr_destroy(pattern);
@@ -1443,7 +1477,7 @@
octstr_get_cstr(os));
octstr_destroy(os);
}
- id = remember_receiver(msg, trans, HTTP_METHOD_POST, pattern, request_headers, msg->sms.msgdata, 0);
+ id = remember_receiver(msg, trans, HTTP_METHOD_POST, pattern, request_headers, msg->sms.msgdata, 0, msg->sms.id);
http_start_request(caller, HTTP_METHOD_POST, pattern, request_headers,
msg->sms.msgdata, 1, id, NULL);
octstr_destroy(pattern);
@@ -1609,7 +1643,7 @@
msg->sms.msgdata = xml;
debug("sms", 0, "XMLBuild: XML: <%s>", octstr_get_cstr(msg->sms.msgdata));
- id = remember_receiver(msg, trans, HTTP_METHOD_POST, pattern, request_headers, msg->sms.msgdata, 0);
+ id = remember_receiver(msg, trans, HTTP_METHOD_POST, pattern, request_headers, msg->sms.msgdata, 0, msg->sms.id);
http_start_request(caller, HTTP_METHOD_POST, pattern, request_headers,
msg->sms.msgdata, 1, id, NULL);
octstr_destroy(pattern);
@@ -1809,8 +1843,13 @@
msg_destroy(reply_msg);
}
}
-
+ if((ret == 0) && ((msg_type(msg) == sms) && ((msg->sms.sms_type == report_mo) ||
+ (urltrans_type(trans) == TRANSTYPE_GET_URL)))){
+ // Queued for HTTP processing
+ msg_destroy(mack);
+ }else{
write_to_bearerbox(mack); /* implicit msg_destroy */
+ }
msg_destroy(msg);
}
@@ -3303,6 +3342,7 @@
Octstr *http_proxy_exceptions_regex = NULL;
int ssl = 0;
int lf, m;
+ long bulk_q;
bb_port = BB_DEFAULT_SMSBOX_PORT;
bb_ssl = 0;
@@ -3414,6 +3454,10 @@
#ifdef HAVE_LIBSSL
cfg_get_bool(&ssl, grp, octstr_imm("sendsms-port-ssl"));
#endif /* HAVE_LIBSSL */
+
+ cfg_get_integer(&bulk_q, grp, octstr_imm("bulk-retry-q"));
+ if(bulk_q > 0)
+ bulk_retry_q = bulk_q;
/*
* load the configuration settings for the sendsms and sendota URIs
--- gateway/gwlib/cfg.def Mon Dec 12 21:11:04 2005
+++ ../src-to-diff/gwlib/cfg.def Fri Dec 16 18:33:23 2005
@@ -271,6 +271,7 @@
OCTSTR(white-list-regex)
OCTSTR(black-list-regex)
OCTSTR(immediate-sendsms-reply)
+ OCTSTR(bulk-retry-q)
)