Hi Andrei/Bogdan/Anca,
I'm trying to add the atomic cache_increment/cache_decrement methods to the
memcached module, which currently has cache_store, cache_remove, cache_fetch
methods.
I have the following doubts, can someone please clarify :
1. cache_fetch, cache_store and cache_remove are not exported by the
memcached module anywhere. How then are we able to use them from
opensips.cfg ?
2. I exported cache_increment/cache_decrement as *cmds *anyway, and was
atleast able to *access them from opensips.cfg. However, they gave back NO
SERVERS DEFINED * as error back from the libmemcached module. The
cache_fetch, cache_remove and cache_store methods work properly.
I have made changes to *modules/memcached/memcached.c , memcache.c and
memcache.h for including my functions. *
I am attaching the three files here, if they are any help..
Thanks a lot,
Abhinav
/*
* $Id: memcached.c 6232 2009-10-07 15:51:38Z andreidragus $
*
* Copyright (C) 2009 Voice Sistem SRL
* Copyright (C) 2009 Andrei Dragus
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
* History:
* ---------
* 2009-07-15 first version (andreidragus)
*/
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "../../dprint.h"
#include "../../timer.h"
#include "../../error.h"
#include "../../ut.h"
#include "../../sr_module.h"
#include "../../mem/mem.h"
#include "../../mem/shm_mem.h"
#include "../../memcache.h"
#include <libmemcached/memcached.h>
typedef struct mem_server_list_t
{
char * servers;
char * name;
memcached_st * memc;
struct mem_server_list_t * next;
}mem_server;
static int mod_init(void);
static int child_init(int rank);
static void destroy(void);
static int add_server( unsigned int type, void *val);
static void wrap_memcached_increment(str*, void*);
static void wrap_memcached_decrement(str*, void*);
mem_server * servers;
/** module parameters */
static param_export_t params[]={
{"server", STR_PARAM|USE_FUNC_PARAM, (void*)&add_server },
{0,0,0}
};
static cmd_export_t cmds[]= {
{"cache_increment", (cmd_function)wrap_memcached_increment, 2, 0, 0, REQUEST_ROUTE|FAILURE_ROUTE},
{"cache_decrement", (cmd_function)wrap_memcached_decrement, 2, 0, 0, REQUEST_ROUTE|FAILURE_ROUTE},
{ 0, 0, 0, 0, 0, 0}
};
/** module exports */
struct module_exports exports= {
"memcached", /* module name */
MODULE_VERSION,
DEFAULT_DLFLAGS, /* dlopen flags */
0, /* exported functions */
params, /* exported parameters */
0, /* exported statistics */
0, /* exported MI functions */
0, /* exported pseudo-variables */
0, /* extra processes */
mod_init, /* module initialization function */
(response_function) 0, /* response handling function */
(destroy_function) destroy, /* destroy function */
child_init /* per-child init function */
};
int wrap_memcached_insert(str* attr, str* value,
unsigned int expires,void * memc)
{
memcached_return rc;
rc = memcached_set((memcached_st*)memc,attr->s, attr->len , value->s,
value->len, (time_t)expires, (uint32_t)0);
if( rc != MEMCACHED_SUCCESS)
{
LM_ERR("Failed to insert: %s\n",memcached_strerror(memc,rc));
return -1;
}
return 1;
}
void wrap_memcached_decrement(str* attr,void * memc)
{
memcached_return rc;
int num;
if (memc == NULL){
LM_ERR(" OK, memc is nULL ");
}
rc = memcached_decrement((memcached_st*)memc, attr->s, attr->len, 1, &num);
if( rc != MEMCACHED_SUCCESS )
{
LM_ERR("Failed to decrement: %s\n",memcached_strerror(memc,rc));
}
}
void wrap_memcached_increment(str* attr,void * memc)
{
memcached_return rc;
int num;
if (memc == NULL){
LM_ERR(" OK, memc is nULL ");
}
rc = memcached_increment((memcached_st*)memc, attr->s, attr->len, 1, &num);
if( rc != MEMCACHED_SUCCESS )
{
LM_ERR("Failed to increment: %s\n",memcached_strerror(memc,rc));
}
}
void wrap_memcached_remove(str* attr,void * memc)
{
memcached_return rc;
rc = memcached_delete((memcached_st*)memc,attr->s,attr->len,0);
if( rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND)
{
LM_ERR("Failed to remove: %s\n",memcached_strerror(memc,rc));
}
}
int wrap_memcached_get(str* attr, str* res,void * memc)
{
memcached_return rc;
char * ret;
size_t ret_len;
uint32_t fl;
char * err;
char * value;
ret = memcached_get((memcached_st*)memc,attr->s, attr->len,
&ret_len,&fl,&rc);
if(ret == NULL)
{
if(rc == MEMCACHED_NOTFOUND)
{
res->s = NULL;
res->len = 0;
return -2;
}
else
{
err = memcached_strerror(memc,rc);
LM_ERR("Failed to get: %s\n",err );
return -1;
}
}
value = pkg_malloc(ret_len);
if( value == NULL)
{
LM_ERR("Memory allocation");
return -1;
}
memcpy(value,ret,ret_len);
res->s = value;
res->len = ret_len;
free(ret);
return 1;
}
/*
* Parse method for parameters.
* Parameters should be in the form:
* [$WHITESPACE]$NAME[$WHITESPACE]=[$WHITESPACE]$LIST
* where $LIST = list of comma separated $HOST[:$PORT]
* E.g: x= localhost:9999,localhost,192.168.2.136:8888
*
* */
int parse_param(char* source, char **res_name,char ** res_value)
{
char *name,*value;
char *name_start,*value_start;
int name_len, value_len;
char * err;
/* parse first whitespace */
while(isspace(*source))
{
if(*source == 0)
{
err = "Missing name";
goto parse_error;
}
source++;
}
/* extract name */
name_start = source;
name_len = 0;
while( !isspace(*source) && *source != '=')
{
if(*source == 0)
{
err = "Missing '='";
goto parse_error;
}
source++;
name_len++;
}
/* parse second whitespace equal sign and third whitespace */
while(isspace(*source) || *source == '=')
{
if(*source == 0)
{
err = "Missing value";
goto parse_error;
}
source++;
}
/* parse list of adresses */
value_start = source;
value_len = 0;
while(*source)
{
if(*source == 0)
{
err = "Missing value";
goto parse_error;
}
source++;
value_len++;
}
//place them in the desired retun fields
name = (char*) pkg_malloc(name_len+1);
if( name == NULL)
{
LM_ERR("Memory allocation");
return -1;
}
value = (char*) pkg_malloc(value_len+1);
if( value == NULL)
{
LM_ERR("Memory allocation");
return -1;
}
memcpy(name,name_start,name_len);
name[name_len]=0;
memcpy(value,value_start,value_len);
value[value_len]=0;
*res_name = name;
*res_value = value;
LM_DBG("Name: %s\n",*res_name);
LM_DBG("Value: %s\n",*res_value);
return 0;
parse_error:
LM_ERR("Parameter parse error - %s\n",err);
return -1;
}
/*
*
* Method that adds a group of servers to the server list.
* It also allocates a new memcached_st structure that will
* be used for initialization in each child
*
*
* */
int add_server( unsigned int type, void *val)
{
char * name;
char * value;
int ret;
mem_server * node;
ret = parse_param((char*)val,&name,&value);
if( ret != 0)
return ret;
node = pkg_malloc(sizeof(mem_server));
node->next = servers;
node->memc = pkg_malloc(sizeof(memcached_st));
node->name = name;
node->servers = value;
servers = node;
return 0;
}
/**
* init module function
*/
static int mod_init(void)
{
memcache_t ms;
mem_server *cur = servers;
/* register each cache system */
while(cur)
{
/* form the new name as "memcached_$NAME" */
int full_len = 10+strlen(cur->name);
char * full_name = (char*)pkg_malloc( full_len +1);
sprintf(full_name,"memcached_%s",cur->name);
/* set the information required */
ms.name.s = full_name;
ms.name.len = full_len;
ms.store = wrap_memcached_insert;
ms.remove = wrap_memcached_remove;
ms.fetch = wrap_memcached_get;
ms.increment = wrap_memcached_increment;
ms.decrement = wrap_memcached_decrement;
ms.data = cur->memc;
if( register_memcache(&ms)< 0)
{
LM_ERR("failed to register to core memory "
"store interface\n");
return -1;
}
cur = cur -> next;
}
return 0;
}
/**
* Initialize children
*/
static int child_init(int rank)
{
memcached_return rc;
memcached_server_st *server_list;
mem_server *cur;
if(rank == PROC_MAIN || rank == PROC_TCP_MAIN)
{
return 0;
}
/* for each cache system and each child initialize the
* memcached_st structure that was previously allocated
*/
cur = servers;
while(cur)
{
cur->memc = memcached_create(cur->memc);
server_list = memcached_servers_parse( cur->servers );
rc = memcached_server_push(cur->memc, server_list);
if( rc != MEMCACHED_SUCCESS)
{
LM_ERR("Push:%s\n",memcached_strerror(cur->memc,rc));
return -1;
}
rc = memcached_behavior_set(cur->memc,
MEMCACHED_BEHAVIOR_NO_BLOCK,1);
if( rc != MEMCACHED_SUCCESS)
{
LM_ERR("Behavior Set:%s\n",
memcached_strerror(cur->memc,rc));
return -1;
}
cur = cur->next;
}
return 0;
}
/*
* destroy function
*/
static void destroy(void)
{
return;
}
/*
* $Id: memcache.h 5855 2009-07-15 08:30:12Z andreidragus $
*
* Copyright (C) 2009 Anca Vamanu
*
* This file is part of opensips, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
* History:
* ---------
* 2009-01-29 first version (Anca Vamanu)
*/
#ifndef _MEM_CACHE_H_
#define _MEM_CACHE_H_
#include "str.h"
typedef int (memcache_store_f)(str* name, str* value, unsigned int expires,void *data);
typedef void (memcache_remove_f)(str* name,void *data);
typedef int (memcache_fetch_f)(str* name, str* val,void *data);
// Adding support for memcache increment and decrement functions
typedef void (memcache_increment_f)(str* name, void *data);
typedef void (memcache_decrement_f)(str* name, void *data);
typedef struct memcache {
str name;
memcache_store_f* store;
memcache_remove_f* remove;
memcache_fetch_f* fetch;
// Adding support for memcache increment and decrement functions.
memcache_increment_f* increment;
memcache_decrement_f* decrement;
void *data;
}memcache_t;
int register_memcache(memcache_t* cs);
/* functions to be used from script */
int cache_store(str* memcache, str* attr, str* val, unsigned int expires);
int cache_remove(str* memcache, str* attr);
int cache_fetch(str* memcache, str* attr, str* val);
// Adding support for memcache increment and decrement functions.
int cache_increment(str* memcache, str* attr);
int cache_decrement(str* memcache, str* attr);
#endif
/*
* $Id: memcache.c 5855 2009-07-15 08:30:12Z andreidragus $
*
* Copyright (C) 2009 Anca Vamanu
*
* This file is part of OpenSIPS, a free SIP server.
*
* opensips is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* opensips is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*
* History:
* ---------
* 2009-01-29 first version (Anca Vamanu)
*/
/*
* OpenSIPS Management Memory Store
*
* The OpenSIPS memory store is a interface to memcache systems used for to
* alleviate database load by storing objects in memory.
*
* Modules containing specific memcache implementations must register
* the control functions to this interface.
*
*/
#include <stdlib.h>
#include <string.h>
#include "dprint.h"
#include "mem/mem.h"
#include "memcache.h"
struct memcache_node
{
memcache_t cs;
struct memcache_node* next;
};
static struct memcache_node* memcache_list = NULL;
static inline memcache_t* lookup_memcache(str name)
{
struct memcache_node* cs_node;
cs_node = memcache_list;
while(cs_node)
{
if (name.len == cs_node->cs.name.len &&
strncmp(name.s, cs_node->cs.name.s, name.len) == 0)
return &cs_node->cs;
cs_node = cs_node->next;
}
return 0;
}
int register_memcache(memcache_t* cs_entry)
{
struct memcache_node* cs_node;
if(cs_entry == NULL)
{
LM_ERR("null argument\n");
return -1;
}
if (lookup_memcache( cs_entry->name))
{
LM_ERR("memcache system <%.*s> already registered\n",
cs_entry->name.len, cs_entry->name.s);
return -1;
}
cs_node = (struct memcache_node*)pkg_malloc(
sizeof(struct memcache_node) + cs_entry->name.len);
if (cs_node== NULL)
{
LM_ERR("no more shared memory\n");
return -1;
}
cs_node->cs.name.s = (char*)cs_node + sizeof(struct memcache_node);
memcpy(cs_node->cs.name.s, cs_entry->name.s, cs_entry->name.len);
cs_node->cs.name.len = cs_entry->name.len;
cs_node->cs.store = cs_entry->store;
cs_node->cs.remove = cs_entry->remove;
cs_node->cs.fetch = cs_entry->fetch;
cs_node->cs.data = cs_entry -> data;
cs_node->cs.increment = cs_entry->increment;
cs_node->cs.decrement = cs_entry->decrement;
cs_node->next = memcache_list;
memcache_list = cs_node;
LM_DBG("registered cache system [%.*s]\n", cs_node->cs.name.len,
cs_node->cs.name.s);
return 0;
}
int cache_store(str* memcache_system, str* attr, str* val,
unsigned int expires)
{
memcache_t* cs;
if(memcache_system == NULL || attr == NULL || val == NULL)
{
LM_ERR("null arguments\n");
return -1;
}
cs = lookup_memcache(*memcache_system);
if(cs == NULL)
{
LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
" this name registered\n",
memcache_system->len,memcache_system->s);
return -1;
}
return cs->store(attr, val, expires,cs->data);
}
int cache_remove(str* memcache_system, str* attr)
{
memcache_t* cs;
if(memcache_system == NULL || attr == NULL)
{
LM_ERR("null arguments\n");
return -1;
}
cs = lookup_memcache(*memcache_system);
if(cs == NULL)
{
LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
" this name registered\n",
memcache_system->len,memcache_system->s);
return -1;
}
cs->remove(attr,cs->data);
return 1;
}
int cache_fetch(str* memcache_system, str* attr, str* val)
{
memcache_t* cs;
if(memcache_system == NULL || attr == NULL )
{
LM_ERR("null arguments\n");
return -1;
}
cs = lookup_memcache(*memcache_system);
if(cs == NULL)
{
LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
" this name registered\n",
memcache_system->len,memcache_system->s);
return -1;
}
return cs->fetch(attr, val,cs->data);
}
int cache_increment(str *memcache_system, str* attr){
memcache_t* cs;
if(memcache_system == NULL || attr == NULL){
LM_ERR("null arguments\n");
return -1;
}
cs = lookup_memcache(*memcache_system);
if(cs == NULL){
LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
" this name registered\n",
memcache_system->len, memcache_system->s);
return -1;
}
cs->increment(attr, cs->data);
}
int cache_decrement(str *memcache_system, str* attr){
memcache_t* cs;
if(memcache_system == NULL || attr == NULL){
LM_ERR("null arguments\n");
return -1;
}
cs = lookup_memcache(*memcache_system);
if(cs == NULL){
LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
" this name registered\n",
memcache_system->len, memcache_system->s);
return -1;
}
cs->decrement(attr, cs->data);
}
_______________________________________________
Devel mailing list
[email protected]
http://lists.opensips.org/cgi-bin/mailman/listinfo/devel