Hi Andrei/Bogdan/Anca,

I'm trying to add the atomic cache_increment/cache_decrement methods to the
memcached module, which currently has cache_store, cache_remove, cache_fetch
methods.
I have the following doubts, can someone please clarify :

1. cache_fetch, cache_store and cache_remove are not exported by the
memcached module anywhere. How then are we able to use them from
opensips.cfg ?
2. I exported cache_increment/cache_decrement as *cmds *anyway, and was
atleast able to *access them from opensips.cfg. However, they gave back NO
SERVERS DEFINED * as error back from the libmemcached module. The
cache_fetch, cache_remove and cache_store methods work properly.

I have made changes to *modules/memcached/memcached.c , memcache.c and
memcache.h  for including my functions. *
I am attaching the three files here, if they are any help..

Thanks a lot,
Abhinav
/*
 * $Id: memcached.c 6232 2009-10-07 15:51:38Z andreidragus $
 *
 * Copyright (C) 2009 Voice Sistem SRL
 * Copyright (C) 2009 Andrei Dragus
 *
 * This file is part of opensips, a free SIP server.
 *
 * opensips is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * opensips is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 *
 * History:
 * ---------
 *  2009-07-15  first version (andreidragus)
 */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include "../../dprint.h"
#include "../../timer.h"
#include "../../error.h"
#include "../../ut.h"
#include "../../sr_module.h"
#include "../../mem/mem.h"
#include "../../mem/shm_mem.h"
#include "../../memcache.h"
#include <libmemcached/memcached.h>






typedef struct mem_server_list_t
{
	char * servers;
	char * name;
	memcached_st * memc;
	struct mem_server_list_t * next;

}mem_server;




static int mod_init(void);
static int child_init(int rank);
static void destroy(void);
static int add_server( unsigned int type, void *val);
static void wrap_memcached_increment(str*, void*);
static void wrap_memcached_decrement(str*, void*);

mem_server * servers;

/** module parameters */
static param_export_t params[]={
	{"server",        STR_PARAM|USE_FUNC_PARAM, (void*)&add_server },
	{0,0,0}
};


static cmd_export_t cmds[]= {
	{"cache_increment",  (cmd_function)wrap_memcached_increment,  2, 0, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{"cache_decrement",  (cmd_function)wrap_memcached_decrement,  2, 0, 0, REQUEST_ROUTE|FAILURE_ROUTE},
	{ 0,      0,                 0,     0,         0,  0}
};

/** module exports */
struct module_exports exports= {
	"memcached",               /* module name */
	MODULE_VERSION,
	DEFAULT_DLFLAGS,            /* dlopen flags */
	0,                          /* exported functions */
	params,                     /* exported parameters */
	0,                          /* exported statistics */
	0,                          /* exported MI functions */
	0,                          /* exported pseudo-variables */
	0,                          /* extra processes */
	mod_init,                   /* module initialization function */
	(response_function) 0,      /* response handling function */
	(destroy_function) destroy, /* destroy function */
	child_init                  /* per-child init function */
};


int wrap_memcached_insert(str* attr, str* value,
				unsigned int expires,void * memc)
{
	memcached_return  rc;

	rc = memcached_set((memcached_st*)memc,attr->s, attr->len , value->s,
				value->len, (time_t)expires, (uint32_t)0);

	if( rc != MEMCACHED_SUCCESS)
	{
		LM_ERR("Failed to insert: %s\n",memcached_strerror(memc,rc));
		return -1;
	}

	return 1;
}

void wrap_memcached_decrement(str* attr,void * memc)
{
	memcached_return  rc;
        int num;
	if (memc == NULL){
		LM_ERR(" OK, memc is nULL ");
	}
	rc = memcached_decrement((memcached_st*)memc, attr->s, attr->len, 1, &num);

	if( rc != MEMCACHED_SUCCESS )
	{
		LM_ERR("Failed to decrement: %s\n",memcached_strerror(memc,rc));
	}
}

void wrap_memcached_increment(str* attr,void * memc)
{
	memcached_return  rc;
	int num;
	if (memc == NULL){
		LM_ERR(" OK, memc is nULL ");
	}
	rc = memcached_increment((memcached_st*)memc, attr->s, attr->len, 1, &num);

	if( rc != MEMCACHED_SUCCESS )
	{
		LM_ERR("Failed to increment: %s\n",memcached_strerror(memc,rc));
	}
}

void wrap_memcached_remove(str* attr,void * memc)
{
	memcached_return  rc;

	rc = memcached_delete((memcached_st*)memc,attr->s,attr->len,0);

	if( rc != MEMCACHED_SUCCESS && rc != MEMCACHED_NOTFOUND)
	{
		LM_ERR("Failed to remove: %s\n",memcached_strerror(memc,rc));
	}
}

int wrap_memcached_get(str* attr, str* res,void * memc)
{
	memcached_return  rc;
	char * ret;
	size_t ret_len;
	uint32_t fl;
	char * err;
	char * value;
    


    
	ret = memcached_get((memcached_st*)memc,attr->s, attr->len,
				&ret_len,&fl,&rc);



	if(ret == NULL)
	{
		if(rc == MEMCACHED_NOTFOUND)
		{
			res->s = NULL;
			res->len = 0;
			return -2;
		}
		else
		{
			err = memcached_strerror(memc,rc);
			LM_ERR("Failed to get: %s\n",err );
			return -1;
		}
	}

	
	value = pkg_malloc(ret_len);
	if( value == NULL)
	{
		LM_ERR("Memory allocation");
		return -1;
	}

	memcpy(value,ret,ret_len);
	res->s = value;
	res->len = ret_len;

	free(ret);

	return 1;
}


/*
 * Parse method for parameters.
 * Parameters should be in the form:
 * [$WHITESPACE]$NAME[$WHITESPACE]=[$WHITESPACE]$LIST
 * where $LIST = list of comma separated $HOST[:$PORT]
 * E.g: x= localhost:9999,localhost,192.168.2.136:8888
 *
 * */
int parse_param(char* source, char **res_name,char ** res_value)
{
	char *name,*value;
	char *name_start,*value_start;
	int name_len, value_len;
	char * err;

	/* parse first whitespace */
	while(isspace(*source))
	{
		if(*source == 0)
		{
			err = "Missing name";
			goto parse_error;
		}
		source++;

	}

	/* extract name */
	name_start = source;
	name_len = 0;

	while( !isspace(*source) && *source != '=')
	{
		if(*source == 0)
		{
			err = "Missing '='";
			goto parse_error;
		}
		source++;
		name_len++;

	}

	/* parse second whitespace equal sign and third whitespace */
	while(isspace(*source) || *source == '=')
	{
		if(*source == 0)
		{
			err = "Missing value";
			goto parse_error;
		}
		source++;

	}

	/* parse list of adresses */
	value_start = source;
	value_len = 0;

	while(*source)
	{
		if(*source == 0)
		{
			err = "Missing value";
			goto parse_error;
		}
		source++;
		value_len++;
	}

	//place them in the desired retun fields
	name = (char*) pkg_malloc(name_len+1);
	if( name == NULL)
	{
		LM_ERR("Memory allocation");
		return -1;
	}

	value = (char*) pkg_malloc(value_len+1);
	if( value == NULL)
	{
		LM_ERR("Memory allocation");
		return -1;
	}

	memcpy(name,name_start,name_len);
	name[name_len]=0;

	memcpy(value,value_start,value_len);
	value[value_len]=0;

	*res_name = name;
	*res_value = value;

	LM_DBG("Name: %s\n",*res_name);
	LM_DBG("Value: %s\n",*res_value);


	return 0;
parse_error:
	LM_ERR("Parameter parse error - %s\n",err);
	return -1;

}

/*
 *
 * Method that adds a group of servers  to the server list.
 * It also allocates a new memcached_st structure that will
 * be used for initialization in each child
 *
 *
 * */
int add_server( unsigned int type, void *val)
{
	char * name;
	char * value;
	int ret;
	mem_server * node;

	ret = parse_param((char*)val,&name,&value);

	if( ret != 0)
		return ret;


	node = pkg_malloc(sizeof(mem_server));

	node->next = servers;
	node->memc = pkg_malloc(sizeof(memcached_st));
	node->name = name;
	node->servers = value;

	servers = node;
	return 0;
    
}

/**
 * init module function
 */

static int mod_init(void)
{
	memcache_t ms;
	mem_server *cur = servers;

	/* register each cache system */

	while(cur)
	{
		/* form the new name as "memcached_$NAME" */
		int full_len = 10+strlen(cur->name);
		char * full_name = (char*)pkg_malloc( full_len +1);
		sprintf(full_name,"memcached_%s",cur->name);

		/* set the information required */
		ms.name.s = full_name;
		ms.name.len = full_len;
		ms.store = wrap_memcached_insert;
		ms.remove = wrap_memcached_remove;
		ms.fetch = wrap_memcached_get;
		ms.increment = wrap_memcached_increment;
		ms.decrement = wrap_memcached_decrement;
		ms.data = cur->memc;

		if( register_memcache(&ms)< 0)
		{
			LM_ERR("failed to register to core memory "
					"store interface\n");
			return -1;
		}

		cur = cur -> next;

	}


	return 0;
}

/**
 * Initialize children
 */
static int child_init(int rank)
{

	memcached_return  rc;
	memcached_server_st *server_list;

	mem_server *cur;

	if(rank == PROC_MAIN || rank == PROC_TCP_MAIN)
	{
		return 0;
	}

	
	/* for each cache system and each child initialize the
	*  memcached_st structure that was previously allocated
	*/

	cur = servers;

	while(cur)
	{
		cur->memc = memcached_create(cur->memc);
		server_list = memcached_servers_parse( cur->servers );
		rc = memcached_server_push(cur->memc, server_list);

		if( rc != MEMCACHED_SUCCESS)
		{
			LM_ERR("Push:%s\n",memcached_strerror(cur->memc,rc));
			return -1;
		}


		rc = memcached_behavior_set(cur->memc,
			MEMCACHED_BEHAVIOR_NO_BLOCK,1);

		if( rc != MEMCACHED_SUCCESS)
		{
			LM_ERR("Behavior Set:%s\n",
				memcached_strerror(cur->memc,rc));
			return -1;
		}

		cur = cur->next;
	}

	return 0;
}

/*
 * destroy function
 */
static void destroy(void)
{
	return;
}
/*
 * $Id: memcache.h 5855 2009-07-15 08:30:12Z andreidragus $
 *
 * Copyright (C) 2009 Anca Vamanu
 *
 * This file is part of opensips, a free SIP server.
 *
 * opensips is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * opensips is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 *
 * History:
 * ---------
 *  2009-01-29  first version (Anca Vamanu)
 */
#ifndef _MEM_CACHE_H_
#define _MEM_CACHE_H_

#include "str.h"

typedef int (memcache_store_f)(str* name, str* value, unsigned int expires,void *data);
typedef void (memcache_remove_f)(str* name,void *data);
typedef int (memcache_fetch_f)(str* name, str* val,void *data);

// Adding support for memcache increment and decrement functions
typedef void (memcache_increment_f)(str* name, void *data);
typedef void (memcache_decrement_f)(str* name, void *data);

typedef struct memcache {
	str name;
	memcache_store_f* store;
	memcache_remove_f* remove;
	memcache_fetch_f* fetch;
        // Adding support for memcache increment and decrement functions.
        memcache_increment_f* increment;
        memcache_decrement_f* decrement;
	void *data;
}memcache_t;


int register_memcache(memcache_t* cs);

/* functions to be used from script */
int cache_store(str* memcache, str* attr, str* val, unsigned int expires);
int cache_remove(str* memcache, str* attr);
int cache_fetch(str* memcache, str* attr, str* val);

// Adding support for memcache increment and decrement functions.
int cache_increment(str* memcache, str* attr);
int cache_decrement(str* memcache, str* attr);

#endif
/*
 * $Id: memcache.c 5855 2009-07-15 08:30:12Z andreidragus $
 *
 * Copyright (C) 2009 Anca Vamanu
 *
 * This file is part of OpenSIPS, a free SIP server.
 *
 * opensips is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * opensips is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
 *
 *
 * History:
 * ---------
 *  2009-01-29  first version (Anca Vamanu)
 */

/*
 * OpenSIPS Management Memory Store
 *
 * The OpenSIPS memory store is a interface to memcache systems used for to
 * alleviate database load by storing objects in memory.
 *
 * Modules containing specific memcache implementations must register
 * the control functions to this interface.
 *
 */

#include <stdlib.h>
#include <string.h>

#include "dprint.h"
#include "mem/mem.h"
#include "memcache.h"

struct memcache_node 
{
	memcache_t cs;
	struct memcache_node* next;
};

static struct memcache_node* memcache_list = NULL;

static inline memcache_t* lookup_memcache(str name)
{
	struct memcache_node* cs_node;

	cs_node = memcache_list;

	while(cs_node)
	{
		if (name.len == cs_node->cs.name.len &&
				strncmp(name.s, cs_node->cs.name.s, name.len) == 0)
			return &cs_node->cs;

		cs_node = cs_node->next;
	}

	return 0;
}


int register_memcache(memcache_t* cs_entry)
{
	struct memcache_node* cs_node;

	if(cs_entry == NULL)
	{
		LM_ERR("null argument\n");
		return -1;
	}

	if (lookup_memcache( cs_entry->name))
	{
		LM_ERR("memcache system <%.*s> already registered\n",
				cs_entry->name.len, cs_entry->name.s);
		return -1;
	}

	cs_node = (struct memcache_node*)pkg_malloc(
		sizeof(struct memcache_node) + cs_entry->name.len);
	if (cs_node== NULL)
	{
		LM_ERR("no more shared memory\n");
		return -1;
	}

	cs_node->cs.name.s = (char*)cs_node + sizeof(struct memcache_node);
	memcpy(cs_node->cs.name.s, cs_entry->name.s, cs_entry->name.len);
	cs_node->cs.name.len = cs_entry->name.len;

	cs_node->cs.store = cs_entry->store;
	cs_node->cs.remove = cs_entry->remove;
	cs_node->cs.fetch = cs_entry->fetch;
	cs_node->cs.data = cs_entry -> data;
        cs_node->cs.increment = cs_entry->increment;
        cs_node->cs.decrement = cs_entry->decrement;

	cs_node->next = memcache_list;
	memcache_list = cs_node;

	LM_DBG("registered cache system [%.*s]\n", cs_node->cs.name.len,
			cs_node->cs.name.s);

	return 0;
}

int cache_store(str* memcache_system, str* attr, str* val,
		unsigned int expires)
{
	memcache_t* cs;

	if(memcache_system == NULL || attr == NULL || val == NULL)
	{
		LM_ERR("null arguments\n");
		return -1;
	}

	cs = lookup_memcache(*memcache_system);
	if(cs == NULL)
	{
		LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
				" this name registered\n",
				memcache_system->len,memcache_system->s);
		return -1;
	}

	return cs->store(attr, val, expires,cs->data);

}

int cache_remove(str* memcache_system, str* attr)
{
	memcache_t* cs;

	if(memcache_system == NULL || attr == NULL)
	{
		LM_ERR("null arguments\n");
		return -1;
	}

	cs = lookup_memcache(*memcache_system);
	if(cs == NULL)
	{
		LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
				" this name registered\n",
				memcache_system->len,memcache_system->s);
		return -1;
	}
	cs->remove(attr,cs->data);

	return 1;
}

int cache_fetch(str* memcache_system, str* attr, str* val)
{
	memcache_t* cs;

	if(memcache_system == NULL || attr == NULL )
	{
		LM_ERR("null arguments\n");
		return -1;
	}

	cs = lookup_memcache(*memcache_system);
	if(cs == NULL)
	{
		LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
				" this name registered\n",
				memcache_system->len,memcache_system->s);
		return -1;
	}
	

	return cs->fetch(attr, val,cs->data);
}

int cache_increment(str *memcache_system, str* attr){
    
    memcache_t* cs;

    if(memcache_system == NULL || attr == NULL){
        LM_ERR("null arguments\n");
        return -1;
    }

    cs = lookup_memcache(*memcache_system);
    if(cs == NULL){
        LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
                                " this name registered\n",
                                memcache_system->len, memcache_system->s);
        return -1;
    }
    cs->increment(attr, cs->data);
    
}


int cache_decrement(str *memcache_system, str* attr){
    
    memcache_t* cs;

    if(memcache_system == NULL || attr == NULL){
        LM_ERR("null arguments\n");
        return -1;
    }

    cs = lookup_memcache(*memcache_system);
    if(cs == NULL){
        LM_ERR("Wrong argument <%.*s> - no memory memcache system with"
                                " this name registered\n",
                                memcache_system->len, memcache_system->s);
        return -1;
    }
   
    cs->decrement(attr, cs->data);
    
}
_______________________________________________
Devel mailing list
[email protected]
http://lists.opensips.org/cgi-bin/mailman/listinfo/devel

Reply via email to