Dear List,
I'm trying to develop a new "box" that I'm planning to contribute it GPL'ed
to the project when it's ready. My goal is a box that reads MT messages from
a shared memory segment and injects them into kannel. This way, a number of
interfaces could be easily done that write to that shared memory segment to
send messages.
I'm pretty close to make it work, and I even developed a PHP module that
implements the function "kannel_send_sms()" on PHP. I've called the box
"phpbox", but I'll probably rename it to "membox", since it can be used with
any language, as long as it writes messages to the shm.
The performance improvement over the HTTP interface and even over sqlbox are
impressive (though it's only for MT, of course). It makes sense, since all
negotiation is done on memory, so it could be a very fast method to inject
MT messages.
I've done most of the job, I've borrowed the structure from
sqlbox-standalone (Renee and Martin's work, credited accordingly) and I've
mostly succeeded in developing the box and php interface, but after I've
started testing it with thousands of messages, I've discovered a race
condition, probably having to do with semaphores.
I'm copying relevant code at the bottom. It's two pieces of code: the server
(aka phpbox, runs as a box) and the client (a test program written in C that
enqueues 100K messages).
Everything works fine so far, but when I try running two instances of the
client program (and sometimes with a single instance), I'm starting getting
the same message (in fact, it's the last message that replaces all the
previous).
To be clear, the client program does:
for(c=1;c<=100000;c++) {
asprintf(&ms, "Hello %d", c);
send_sms(ms);
}
So I should receive:
Hello 1
Hello 2
...
Hello 100000
But instead, I receive:
Hello 1
Hello 2
...
Hello 23456
Hello 100000
Hello 100000
...
Hello 100000
Hello 100000
No messages are lost, but somewhat the shared memory area is being
overwritten before it's being read, I suppose.
I'm a total newby at semaphores, and I've borrowed the semaphore code from a
few examples floating on the net, so I suppose I'm doing something wrong
there.
I've also noticed gwlib have a semaphore implementation, but didn't know
where to start or how to use it. Maybe regular semaphores and gwlib doesn't
go along well?
Could someone take a look at the code and tell me if there's something
wrong, or how to rewrite it to use gwlib's semaphores? If you need the full
(flawed) source code I'll gladly send it.
I've had to hex-encode the msg structure to pass it through the shared
memory as a string. I don't particularily like it, so if anyone has a better
way to pass a variable-length (probably with nulls in the middle) Octstr
through a shared memory segment, please let me know.
Thank you in advance,
Alejandro
--
Alejandro Guerrieri
Magicom
http://www.magicom-bcn.net/
LinkedIn: http://www.linkedin.com/in/aguerrieri
++++++++++++++++++++++++++++++++++++++++
/*
*
* SERVER SIDE
*
*/
....
struct shmid_ds shmid_struct;
static int semid;
static int shmid;
static void *shm_address;
#define SEMKEYPATH "/usr/local/bin/phpbox"
#define SEMKEYID 1
#define SHMKEYPATH "/usr/local/bin/phpbox"
#define SHMKEYID 1
#define NUMSEMS 2
#define SIZEOFSHMSEG 65536
....
static void phpboxc_run()
{
int ret;
int timeout = 10;
struct sembuf op[2];
Octstr *os;
Msg *msg;
char *str;
Boxc *boxc;
boxc = gw_malloc(sizeof(Boxc));
boxc->bearerbox_connection = connect_to_bearerbox_real(bearerbox_host,
bearerbox_port, bearerbox_port_ssl, NULL);
boxc->smsbox_connection = NULL;
boxc->client_ip = NULL;
boxc->alive = 1;
boxc->connect_time = time(NULL);
boxc->boxc_id = octstr_duplicate(phpbox_id);
if (boxc->bearerbox_connection == NULL) {
boxc_destroy(boxc);
return;
}
identify_to_bearerbox(boxc);
init_shm();
while(phpbox_status != PHP_DEAD) {
op[0].sem_num = 1;
op[0].sem_op = -1;
op[0].sem_flg = 0;
op[1].sem_num = 0;
op[1].sem_op = 1;
op[1].sem_flg = IPC_NOWAIT;
if (semop( semid, op, 2 ) == -1) {
printf("main: semop(2) failed\n");
phpbox_status = PHP_SHUTDOWN;
ret = -1;
break;
}
os = octstr_create((char *) shm_address);
octstr_hex_to_binary(os);
msg = msg_unpack(os);
send_msg(boxc->bearerbox_connection, boxc, msg);
msg_destroy(msg);
octstr_destroy(os);
op[0].sem_num = 0;
op[0].sem_op = -1;
op[0].sem_flg = IPC_NOWAIT;
if (semop( semid, op, 1 ) == -1) {
printf("main: semop(1) failed\n");
phpbox_status = PHP_SHUTDOWN;
ret = -1;
break;
}
if (phpbox_status == PHP_SHUTDOWN) {
if (ret == -1 || !timeout)
break;
else
timeout--;
}
}
shutdown_shm();
boxc_destroy(boxc);
}
int init_shm() {
int rc;
key_t semkey, shmkey;
short sarray[NUMSEMS];
semkey = ftok(SEMKEYPATH,SEMKEYID);
if ( semkey == (key_t)-1 )
{
printf("main: ftok() for sem failed\n");
return -1;
}
shmkey = ftok(SHMKEYPATH,SHMKEYID);
if ( shmkey == (key_t)-1 )
{
printf("main: ftok() for shm failed\n");
return -1;
}
semid = semget( semkey, NUMSEMS, 0666 | IPC_CREAT | IPC_EXCL );
if ( semid == -1 )
{
printf("main: semget() failed\n");
return -1;
}
sarray[0] = 0;
sarray[1] = 0;
rc = semctl( semid, 1, SETALL, sarray);
if(rc == -1)
{
printf("main: semctl() initialization failed\n");
return -1;
}
shmid = shmget(shmkey, SIZEOFSHMSEG, 0666 | IPC_CREAT | IPC_EXCL);
if (shmid == -1)
{
printf("main: shmget() failed\n");
return -1;
}
shm_address = shmat(shmid, NULL, 0);
if ( shm_address==NULL )
{
printf("main: shmat() failed\n");
return -1;
}
printf("Ready for client jobs\n");
}
int shutdown_shm() {
int rc;
rc = semctl( semid, 1, IPC_RMID );
if (rc==-1) {
printf("main: semctl() remove id failed\n");
return -1;
}
rc = shmdt(shm_address);
if (rc==-1) {
printf("main: shmdt() failed\n");
return -1;
}
rc = shmctl(shmid, IPC_RMID, &shmid_struct);
if (rc==-1) {
printf("main: shmctl() failed\n");
return -1;
}
return 0;
}
/*
*
* CLIENT SIDE
*
*/
#include <stdio.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include "gw/msg.h"
#include "gwlib/gwlib.h"
#define SEMKEYPATH "/usr/local/bin/phpbox"
#define SEMKEYID 1
#define SHMKEYPATH "/usr/local/bin/phpbox"
#define SHMKEYID 1
#define NUMSEMS 2
#define SIZEOFSHMSEG 65536
int main() {
int c;
char *ms;
for(c=1;c<=100000;c++) {
asprintf(&ms, "Hello %d", c);
send_sms(ms);
}
}
int send_sms(char *mess) {
struct sembuf ops[2];
void *mem_addr;
int semid, shmid, rc;
key_t semkey, shmkey;
Msg *msg;
Octstr *os;
char *str;
semkey = ftok(SEMKEYPATH,SEMKEYID);
if ( semkey == (key_t)-1 ){
printf("main: ftok() for sem failed\n");
return -1;
}
shmkey = ftok(SHMKEYPATH,SHMKEYID);
if ( shmkey == (key_t)-1 ) {
printf("main: ftok() for shm failed\n");
return -1;
}
semid = semget( semkey, NUMSEMS, 0666);
if ( semid == -1 ) {
printf("main: semget() failed\n");
return -1;
}
shmid = shmget(shmkey, SIZEOFSHMSEG, 0666);
if (shmid == -1) {
printf("main: shmget() failed\n");
return -1;
}
mem_addr = shmat(shmid, NULL, 0);
if ( mem_addr==NULL ) {
printf("main: shmat() failed\n");
return -1;
}
ops[0].sem_num = 0;
ops[0].sem_op = 0;
ops[0].sem_flg = 0;
ops[1].sem_num = 0;
ops[1].sem_op = 1;
ops[1].sem_flg = 0;
rc = semop( semid, ops, 2 );
if (rc == -1) {
printf("main: semop() failed\n");
return -1;
}
octstr_init();
msg = msg_create(sms);
msg->sms.sender = octstr_create("123");
msg->sms.receiver = octstr_create("456");
msg->sms.msgdata = octstr_create(mess);
msg->sms.smsc_id = octstr_create("test");
os = msg_pack(msg);
octstr_binary_to_hex(os,1);
strcpy((char *) mem_addr, octstr_get_cstr(os));
octstr_destroy(os);
msg_destroy(msg);
octstr_shutdown();
ops[0].sem_num = 0;
ops[0].sem_op = -1;
ops[0].sem_flg = 0;
ops[1].sem_num = 1;
ops[1].sem_op = 1;
ops[1].sem_flg = 0;
rc = semop( semid, ops, 2 );
if (rc == -1) {
printf("main: semop() failed\n");
return -1;
}
rc = shmdt(mem_addr);
if (rc==-1) {
printf("main: shmdt() failed\n");
return -1;
}
return 0;
}
++++++++++++++++++++++++++++++++++++++++