Since no one seemed to object, I have committed this patch to trunk in
r428931.
-Paul
Paul Querna wrote:
> Attached is a patch to add support for multiple parallel fetching of
> keys from a memcache cluster. It enables fetching of thousands of
> values from multiple memcache nodes without any significant slow downs
> for adding more nodes or keys.
>
> The basic logic behind the main function, apr_memcache_multgetp, is
> currently pounding away with thousands of concurrent queries per second,
> without any problem. I wouldn't call the function itself 'pretty', but
> I believe the logic embedded in it is correct.
>
> [[[
> Add memcache multi-get support to apr_memcache.
>
> * include/apr_memcache.h
> Add _value_t structure for holding an individual value from
> memcached.
> Add new functions: apr_memcache_add_multget_key,
> apr_memcache_multgetp.
>
> * memcache/apr_memcache.c:
> Add a local baton structure for associating severs to queries.
> Add new functions: apr_memcache_add_multget_key,
> mget_conn_result, apr_memcache_multgetp
>
> Submitted By: Rob Emanuele <rob.emanuele ask.com>, Paul Querna
> <paul.querna ask.com>
> ]]]
>
>
> ------------------------------------------------------------------------
>
> Index: memcache/apr_memcache.c
> ===================================================================
> --- memcache/apr_memcache.c (revision 428526)
> +++ memcache/apr_memcache.c (working copy)
> @@ -17,6 +17,7 @@
> */
>
> #include "apr_memcache.h"
> +#include "apr_poll.h"
> #include "apr_version.h"
> #include <stdlib.h>
>
> @@ -38,6 +39,9 @@
> #define MC_EOL "\r\n"
> #define MC_EOL_LEN (sizeof(MC_EOL)-1)
>
> +#define MC_WS " "
> +#define MC_WS_LEN (sizeof(MC_WS)-1)
> +
> #define MC_GET "get "
> #define MC_GET_LEN (sizeof(MC_GET)-1)
>
> @@ -97,7 +101,16 @@
> #define MS_END "END"
> #define MS_END_LEN (sizeof(MS_END)-1)
>
> +/** Server and Query Structure for a multiple get */
> +struct cache_server_query_t {
> + apr_memcache_server_t* ms;
> + apr_memcache_conn_t* conn;
> + struct iovec* query_vec;
> + unsigned int query_vec_count;
> +};
>
> +#define MULT_GET_TIMEOUT 50000
> +
> static apr_status_t make_server_dead(apr_memcache_t *mc,
> apr_memcache_server_t *ms)
> {
> #if APR_HAS_THREADS
> @@ -1021,6 +1034,334 @@
> }
>
>
> +APR_DECLARE(void)
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> + const char* key,
> + apr_hash_t **values)
> +{
> + apr_memcache_value_t* value;
> + int klen = strlen(key);
> +
> + // create the value hash if need be
> + if (!*values) {
> + *values = apr_hash_make(data_pool);
> + }
> +
> + // init key and add it to the value hash
> + value = apr_pcalloc(data_pool, sizeof(apr_memcache_value_t));
> +
> + value->status = APR_NOTFOUND;
> + value->key = apr_pstrdup(data_pool, key);
> +
> + apr_hash_set(*values, value->key, klen, value);
> +}
> +
> +static void mget_conn_result(int up,
> + apr_status_t rv,
> + apr_memcache_t *mc,
> + apr_memcache_server_t *ms,
> + apr_memcache_conn_t *conn,
> + struct cache_server_query_t *server_query,
> + apr_hash_t *values,
> + apr_hash_t *server_queries)
> +{
> + int j;
> + apr_memcache_value_t* value;
> +
> + if (!up) {
> + ms_bad_conn(ms, conn);
> + apr_memcache_disable_server(mc, ms);
> + }
> +
> + for (j = 1; j < server_query->query_vec_count ; j+=2) {
> + if (server_query->query_vec[j].iov_base) {
> + value = apr_hash_get(values, server_query->query_vec[j].iov_base,
> +
> strlen(server_query->query_vec[j].iov_base));
> +
> + if (value->status == APR_NOTFOUND) {
> + value->status = rv;
> + }
> + }
> + }
> +
> + ms_release_conn(ms, conn);
> +
> + apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +}
> +
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> + apr_pool_t *temp_pool,
> + apr_pool_t *data_pool,
> + apr_hash_t *values)
> +{
> + apr_status_t rv;
> + apr_memcache_server_t* ms;
> + apr_memcache_conn_t* conn;
> + apr_uint32_t hash;
> + apr_size_t written;
> + int klen;
> +
> + apr_memcache_value_t* value;
> + apr_hash_index_t* value_hash_index;
> +
> + /* this is a little over aggresive, but beats multiple loops
> + * to figure out how long each vector needs to be per-server.
> + */
> + unsigned int veclen = 2 + 2 * apr_hash_count(values) - 1; /* get
> <key>[<space><key>...]\r\n */
> + unsigned int i, j;
> + unsigned int queries_sent;
> + apr_int32_t queries_recvd;
> +
> + apr_hash_t * server_queries = apr_hash_make(temp_pool);
> + struct cache_server_query_t* server_query;
> + apr_hash_index_t * query_hash_index;
> +
> + apr_pollset_t* pollset;
> + const apr_pollfd_t* activefds;
> + apr_pollfd_t* pollfds;
> +
> +
> + // build all the queries
> + value_hash_index = apr_hash_first(temp_pool, values);
> + while (value_hash_index) {
> + apr_hash_this(value_hash_index, NULL, NULL, (void**)&value);
> + value_hash_index = apr_hash_next(value_hash_index);
> + klen = strlen(value->key);
> +
> + hash = apr_memcache_hash(value->key, klen);
> + ms = apr_memcache_find_server_hash(mc, hash);
> + if (ms == NULL) {
> + continue;
> + }
> +
> + server_query = apr_hash_get(server_queries, &ms, sizeof(ms));
> +
> + if (!server_query) {
> + rv = ms_find_conn(ms, &conn);
> +
> + if (rv != APR_SUCCESS) {
> + apr_memcache_disable_server(mc, ms);
> + value->status = rv;
> + continue;
> + }
> +
> + server_query = apr_pcalloc(temp_pool,sizeof(struct
> cache_server_query_t));
> +
> + apr_hash_set(server_queries, &ms, sizeof(ms), server_query);
> +
> + server_query->ms = ms;
> + server_query->conn = conn;
> + server_query->query_vec = apr_pcalloc(temp_pool, sizeof(struct
> iovec)*veclen);
> +
> + // set up the first key
> + server_query->query_vec[0].iov_base = MC_GET;
> + server_query->query_vec[0].iov_len = MC_GET_LEN;
> +
> + server_query->query_vec[1].iov_base = (void*)(value->key);
> + server_query->query_vec[1].iov_len = klen;
> +
> + server_query->query_vec[2].iov_base = MC_EOL;
> + server_query->query_vec[2].iov_len = MC_EOL_LEN;
> +
> + server_query->query_vec_count = 3;
> + }
> + else {
> + j = server_query->query_vec_count - 1;
> +
> + server_query->query_vec[j].iov_base = MC_WS;
> + server_query->query_vec[j].iov_len = MC_WS_LEN;
> + j++;
> +
> + server_query->query_vec[j].iov_base = (void*)(value->key);
> + server_query->query_vec[j].iov_len = klen;
> + j++;
> +
> + server_query->query_vec[j].iov_base = MC_EOL;
> + server_query->query_vec[j].iov_len = MC_EOL_LEN;
> + j++;
> +
> + server_query->query_vec_count = j;
> + }
> + }
> +
> + // create polling structures
> + pollfds = apr_pcalloc(temp_pool, apr_hash_count(server_queries) *
> sizeof(apr_pollfd_t));
> +
> + rv = apr_pollset_create(&pollset, apr_hash_count(server_queries),
> temp_pool, 0);
> +
> + if (rv != APR_SUCCESS) {
> + return rv;
> + }
> +
> + // send all the queries
> + queries_sent = 0;
> + query_hash_index = apr_hash_first(temp_pool, server_queries);
> +
> + while (query_hash_index) {
> + apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> + query_hash_index = apr_hash_next(query_hash_index);
> +
> + conn = server_query->conn;
> + ms = server_query->ms;
> +
> + for (i = 0, rv = APR_SUCCESS; i < veclen && rv == APR_SUCCESS; i +=
> IOV_MAX) {
> + rv = apr_socket_sendv(conn->sock, &(server_query->query_vec[i]),
> + veclen-i>IOV_MAX ? IOV_MAX : veclen-i ,
> &written);
> + }
> +
> + if (rv != APR_SUCCESS) {
> + mget_conn_result(FALSE, rv, mc, ms, conn,
> + server_query, values, server_queries);
> + continue;
> + }
> +
> + pollfds[queries_sent].desc_type = APR_POLL_SOCKET;
> + pollfds[queries_sent].reqevents = APR_POLLIN;
> + pollfds[queries_sent].p = temp_pool;
> + pollfds[queries_sent].desc.s = conn->sock;
> + pollfds[queries_sent].client_data = (void *)server_query;
> + apr_pollset_add (pollset, &pollfds[queries_sent]);
> +
> + queries_sent++;
> + }
> +
> + while (queries_sent) {
> + rv = apr_pollset_poll(pollset, MULT_GET_TIMEOUT, &queries_recvd,
> &activefds);
> +
> + if (rv != APR_SUCCESS) {
> + // timeout
> + queries_sent = 0;
> + continue;
> + }
> + for (i = 0; i < queries_recvd; i++) {
> + server_query = activefds[i].client_data;
> + conn = server_query->conn;
> + ms = server_query->ms;
> +
> + rv = get_server_line(conn);
> +
> + if (rv != APR_SUCCESS) {
> + apr_pollset_remove (pollset, &activefds[i]);
> + mget_conn_result(FALSE, rv, mc, ms, conn,
> + server_query, values, server_queries);
> + queries_sent--;
> + continue;
> + }
> +
> + if (strncmp(MS_VALUE, conn->buffer, MS_VALUE_LEN) == 0) {
> + char *key;
> + char *flags;
> + char *length;
> + char *start;
> + char *last;
> + char *data;
> + apr_size_t len;
> +
> + start = conn->buffer;
> + key = apr_strtok(conn->buffer, " ", &last); // just the
> VALUE, ignore
> + key = apr_strtok(NULL, " ", &last);
> + flags = apr_strtok(NULL, " ", &last);
> +
> +
> + length = apr_strtok(NULL, " ", &last);
> + len = atoi(length);
> +
> + value = apr_hash_get(values, key, strlen(key));
> +
> +
> + if (value) {
> + if (len > 0) {
> + apr_bucket_brigade *bbb;
> + apr_bucket *e;
> +
> + /* eat the trailing \r\n */
> + rv = apr_brigade_partition(conn->bb, len+2, &e);
> +
> + if (rv != APR_SUCCESS) {
> + apr_pollset_remove (pollset, &activefds[i]);
> + mget_conn_result(FALSE, rv, mc, ms, conn,
> + server_query, values,
> server_queries);
> + queries_sent--;
> + continue;
> + }
> +
> + bbb = apr_brigade_split(conn->bb, e);
> +
> + rv = apr_brigade_pflatten(conn->bb, &data, &len,
> data_pool);
> +
> + if (rv != APR_SUCCESS) {
> + apr_pollset_remove (pollset, &activefds[i]);
> + mget_conn_result(FALSE, rv, mc, ms, conn,
> + server_query, values,
> server_queries);
> + queries_sent--;
> + continue;
> + }
> +
> + rv = apr_brigade_destroy(conn->bb);
> + if (rv != APR_SUCCESS) {
> + apr_pollset_remove (pollset, &activefds[i]);
> + mget_conn_result(FALSE, rv, mc, ms, conn,
> + server_query, values,
> server_queries);
> + queries_sent--;
> + continue;
> + }
> +
> + conn->bb = bbb;
> +
> + value->len = len - 2;
> + data[value->len] = '\0';
> + value->data = data;
> + }
> +
> + value->status = rv;
> + value->flags = atoi(flags);
> +
> + // stay on the server
> + i--;
> +
> + }
> + else {
> + // TODO: Server Sent back a key I didn't ask for or my
> hash is corrupt
> + }
> + }
> + else if (strncmp(MS_END, conn->buffer, MS_END_LEN) == 0) {
> + // this connection is done
> + ms_release_conn(ms, conn);
> + apr_hash_set(server_queries, &ms, sizeof(ms), NULL);
> +
> + apr_pollset_remove (pollset, &activefds[i]);
> + queries_sent--;
> + }
> + else {
> + /* unknown reply? */
> + rv = APR_EGENERAL;
> + }
> +
> + } /* /for */
> + } /* /while */
> +
> + query_hash_index = apr_hash_first(temp_pool, server_queries);
> + while (query_hash_index) {
> + apr_hash_this(query_hash_index, NULL, NULL, (void**)&server_query);
> + query_hash_index = apr_hash_next(query_hash_index);
> +
> + conn = server_query->conn;
> + ms = server_query->ms;
> +
> + mget_conn_result(TRUE, rv, mc, ms, conn,
> + server_query, values, server_queries);
> + continue;
> + }
> +
> + apr_pool_clear(temp_pool);
> + apr_pollset_destroy(pollset);
> + return APR_SUCCESS;
> +
> +}
> +
> +
> +
> /**
> * Define all of the strings for stats
> */
> Index: include/apr_memcache.h
> ===================================================================
> --- include/apr_memcache.h (revision 428526)
> +++ include/apr_memcache.h (working copy)
> @@ -35,6 +35,7 @@
> #include "apr_ring.h"
> #include "apr_buckets.h"
> #include "apr_reslist.h"
> +#include "apr_hash.h"
>
> #ifdef __cplusplus
> extern "C" {
> @@ -85,6 +86,16 @@
> apr_pool_t *p; /** Pool to use for allocations */
> } apr_memcache_t;
>
> +/** Returned Data from a multiple get */
> +typedef struct
> +{
> + apr_status_t status;
> + const char* key;
> + apr_size_t len;
> + char *data;
> + apr_uint16_t flags;
> +} apr_memcache_value_t;
> +
> /**
> * Creates a crc32 hash used to split keys between servers
> * @param data Data to be hashed
> @@ -194,7 +205,37 @@
> apr_size_t *len,
> apr_uint16_t *flags);
>
> +
> /**
> + * Add a key to a hash for a multiget query
> + * if the hash (*value) is NULL it will be created
> + * @param data_pool pool from where the hash and their items are created from
> + * @param key null terminated string containing the key
> + * @param values hash of keys and values that this key will be added to
> + * @return
> + */
> +APR_DECLARE(void)
> +apr_memcache_add_multget_key(apr_pool_t *data_pool,
> + const char* key,
> + apr_hash_t **values);
> +
> +/**
> + * Gets multiple values from the server, allocating the values out of p
> + * @param mc client to use
> + * @param temp_pool Pool used for tempoary allocations. May be cleared
> inside this
> + * call.
> + * @param data_pool Pool used to allocate data for the returned values.
> + * @param values hash of apr_memcache_value_t keyed by strings, contains the
> + * result of the multiget call.
> + * @return
> + */
> +APR_DECLARE(apr_status_t)
> +apr_memcache_multgetp(apr_memcache_t *mc,
> + apr_pool_t *temp_pool,
> + apr_pool_t *data_pool,
> + apr_hash_t *values);
> +
> +/**
> * Sets a value by key on the server
> * @param mc client to use
> * @param key null terminated string containing the key