Author: grothoff Date: 2008-02-20 23:51:29 -0700 (Wed, 20 Feb 2008) New Revision: 6401
Modified: GNUnet/src/applications/fs/ecrs/search.c GNUnet/src/applications/fs/gap/fs.c GNUnet/src/applications/fs/gap/gap.c GNUnet/src/applications/fs/gap/querymanager.c GNUnet/src/applications/fs/gap/querymanager.h GNUnet/src/applications/fs/gap/shared.h GNUnet/src/include/fs.h Log: better async processing Modified: GNUnet/src/applications/fs/ecrs/search.c =================================================================== --- GNUnet/src/applications/fs/ecrs/search.c 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/ecrs/search.c 2008-02-21 06:51:29 UTC (rev 6401) @@ -330,7 +330,10 @@ GNUNET_EC_file_block_check_and_get_query (size, (const DBlock *) &value[1], GNUNET_YES, &query)) - return GNUNET_SYSERR; + { + GNUNET_GE_BREAK(NULL, 0); + return GNUNET_SYSERR; + } if (!((0 == memcmp (&query, (GNUNET_HashCode *) & ps[1], sizeof (GNUNET_HashCode))) && ((ps->type == type) || (ps->type == GNUNET_ECRS_BLOCKTYPE_ANY)) @@ -341,7 +344,9 @@ ps->keyCount, (GNUNET_HashCode *) & ps[1])))) - return GNUNET_OK; /* not a match */ + { + return GNUNET_OK; /* not a match */ + } switch (type) { @@ -355,7 +360,10 @@ int j; if (size < sizeof (KBlock)) - return GNUNET_SYSERR; + { + GNUNET_GE_BREAK(NULL, 0); + return GNUNET_SYSERR; + } kb = GNUNET_malloc (size); memcpy (kb, &value[1], size); #if DEBUG_SEARCH Modified: GNUnet/src/applications/fs/gap/fs.c =================================================================== --- GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/gap/fs.c 2008-02-21 06:51:29 UTC (rev 6401) @@ -392,6 +392,8 @@ { struct GNUNET_ClientHandle *sock; struct ResponseList *seen; + unsigned int processed; + int have_more; }; /** @@ -413,7 +415,14 @@ GNUNET_DatastoreValue *enc; const GNUNET_DatastoreValue *use; unsigned int type; + int ret; + if (cls->processed > MAX_SYNC_PROCESSED) + { + cls->have_more = GNUNET_YES; + return GNUNET_SYSERR; + } + cls->processed++; size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue); dblock = (const DBlock *) &value[1]; enc = NULL; @@ -435,11 +444,12 @@ memcpy (&msg[1], dblock, size); type = ntohl (dblock->type); GNUNET_free_non_null (enc); - coreAPI->cs_send_to_client (sock, &msg->header, - type != GNUNET_ECRS_BLOCKTYPE_DATA ? GNUNET_NO : GNUNET_YES); + ret = coreAPI->cs_send_to_client (sock, &msg->header, + type != GNUNET_ECRS_BLOCKTYPE_DATA ? GNUNET_NO : GNUNET_YES); GNUNET_free (msg); - if (type == GNUNET_ECRS_BLOCKTYPE_DATA) - return GNUNET_SYSERR; /* unique response */ + if ( (type == GNUNET_ECRS_BLOCKTYPE_DATA) || + (ret != GNUNET_OK) ) + return GNUNET_SYSERR; /* unique response or client can take no more*/ rl = GNUNET_malloc (sizeof (struct ResponseList)); GNUNET_hash (dblock, size, &rl->hash); rl->next = cls->seen; @@ -485,6 +495,8 @@ #endif fpp.sock = sock; fpp.seen = NULL; + fpp.have_more = GNUNET_NO; + fpp.processed = 0; if (type == GNUNET_ECRS_BLOCKTYPE_DATA) { if ((1 == datastore->get (&rs->query[0], @@ -505,7 +517,8 @@ GNUNET_FS_QUERYMANAGER_start_query (&rs->query[0], keyCount, anonymityLevel, type, sock, have_target ? &rs->target : NULL, - fpp.seen); + fpp.seen, + fpp.have_more); CLEANUP: while (fpp.seen != NULL) { Modified: GNUnet/src/applications/fs/gap/gap.c =================================================================== --- GNUnet/src/applications/fs/gap/gap.c 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/gap/gap.c 2008-02-21 06:51:29 UTC (rev 6401) @@ -43,12 +43,6 @@ #define MAX_ENTRIES_PER_SLOT 2 /** - * If, after finding local results, we abort a GET - * iteration, we increment "have_more" by this value. - */ -#define HAVE_MORE_INCREMENT 5 - -/** * How often do we check have_more? */ #define HAVE_MORE_FREQUENCY (100 * GNUNET_CRON_MILLISECONDS) Modified: GNUnet/src/applications/fs/gap/querymanager.c =================================================================== --- GNUnet/src/applications/fs/gap/querymanager.c 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/gap/querymanager.c 2008-02-21 06:51:29 UTC (rev 6401) @@ -64,17 +64,18 @@ }; -static GNUNET_CoreAPIForPlugins *coreAPI; - /** * List of all clients, their active requests and other * per-client information. */ static struct ClientDataList *clients; +static GNUNET_CoreAPIForPlugins * coreAPI; static GNUNET_Stats_ServiceAPI *stats; +static GNUNET_Datastore_ServiceAPI *datastore; + static int stat_gap_client_query_received; static int stat_gap_client_response_sent; @@ -131,7 +132,8 @@ unsigned int type, struct GNUNET_ClientHandle *client, const GNUNET_PeerIdentity * target, - const struct ResponseList *seen) + const struct ResponseList *seen, + int have_more) { struct ClientDataList *cl; struct RequestList *request; @@ -153,6 +155,8 @@ request->primary_target = GNUNET_FS_PT_intern (target); request->response_client = client; request->policy = GNUNET_FS_RoutingPolicy_ALL; + if (have_more != GNUNET_NO) + request->have_more = HAVE_MORE_INCREMENT; memcpy (&request->queries[0], query, sizeof (GNUNET_HashCode) * key_count); if (seen != NULL) { @@ -275,13 +279,13 @@ msg->expirationTime = GNUNET_htonll (expirationTime); memcpy (&msg[1], data, size); coreAPI->cs_send_to_client (client, - &msg->header, - (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA) - ? GNUNET_NO : GNUNET_YES); + &msg->header, + (rl->type != GNUNET_ECRS_BLOCKTYPE_DATA) + ? GNUNET_NO : GNUNET_YES); if (stats != NULL) stats->change (stat_gap_client_response_sent, 1); GNUNET_free (msg); - + /* update *value */ *value += 1 + rl->value; GNUNET_FS_PLAN_success (sender, client, 0, rl); @@ -431,13 +435,71 @@ GNUNET_mutex_unlock (GNUNET_FS_lock); } + +struct HMClosure +{ + struct RequestList * request; + unsigned int processed; + int have_more; +}; + /** + * Any response that we get should be passed + * back to the client. If the response is unique, + * we should about the iteration (return GNUNET_SYSERR). + */ +static int +have_more_processor (const GNUNET_HashCode * key, + const GNUNET_DatastoreValue * + value, void *closure, unsigned long long uid) +{ + struct HMClosure *cls = closure; + const DBlock *dblock; + GNUNET_HashCode hc; + CS_fs_reply_content_MESSAGE *msg; + unsigned int size; + int ret; + + size = ntohl (value->size) - sizeof (GNUNET_DatastoreValue); + dblock = (const DBlock *) &value[1]; + if (GNUNET_OK == GNUNET_FS_SHARED_test_valid_new_response (cls->request, + key, + size, + dblock, &hc)) + { + msg = GNUNET_malloc (sizeof (CS_fs_reply_content_MESSAGE) + size); + msg->header.type = htons (GNUNET_CS_PROTO_GAP_RESULT); + msg->header.size = htons (sizeof (CS_fs_reply_content_MESSAGE) + size); + msg->anonymityLevel = value->anonymityLevel; + msg->expirationTime = value->expirationTime; + memcpy (&msg[1], dblock, size); + ret = coreAPI->cs_send_to_client (cls->request->response_client, + &msg->header, + GNUNET_YES); + GNUNET_free (msg); + if (ret != GNUNET_OK) + return GNUNET_SYSERR; /* client can take no more */ + GNUNET_FS_SHARED_mark_response_seen(cls->request, + &hc); + } + cls->processed++; + if (cls->processed > MAX_ASYNC_PROCESSED) + { + cls->have_more = GNUNET_YES; + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** * Cron-job to periodically check if we should * repeat requests. */ static void repeat_requests_job (void *unused) { + struct HMClosure hmc; struct ClientDataList *client; struct RequestList *request; GNUNET_CronTime now; @@ -450,27 +512,40 @@ request = client->requests; while (request != NULL) { - if ((NULL == request->plan_entries) && - ((client->client != NULL) || - (request->expiration > now)) && - (request->last_ttl_used * GNUNET_CRON_SECONDS + - request->last_request_time < now)) - { - if ((GNUNET_OK == - GNUNET_FS_PLAN_request (client->client, 0, request)) - && (stats != NULL)) - stats->change (stat_gap_client_query_injected, 1); - } - - if ((request->anonymityLevel == 0) && - (request->last_dht_get + request->dht_back_off < now)) - { - if (request->dht_back_off * 2 > request->dht_back_off) - request->dht_back_off *= 2; - request->last_dht_get = now; - GNUNET_FS_DHT_execute_query (request->type, - &request->queries[0]); - } + if (request->have_more > 0) + { + request->have_more--; + hmc.request = request; + hmc.processed = 0; + hmc.have_more = GNUNET_NO; + datastore->get (&request->queries[0], request->type, &have_more_processor, &hmc); + if (hmc.have_more) + request->have_more += HAVE_MORE_INCREMENT; + } + else + { + if ((NULL == request->plan_entries) && + ((client->client != NULL) || + (request->expiration > now)) && + (request->last_ttl_used * GNUNET_CRON_SECONDS + + request->last_request_time < now)) + { + if ((GNUNET_OK == + GNUNET_FS_PLAN_request (client->client, 0, request)) + && (stats != NULL)) + stats->change (stat_gap_client_query_injected, 1); + } + + if ((request->anonymityLevel == 0) && + (request->last_dht_get + request->dht_back_off < now)) + { + if (request->dht_back_off * 2 > request->dht_back_off) + request->dht_back_off *= 2; + request->last_dht_get = now; + GNUNET_FS_DHT_execute_query (request->type, + &request->queries[0]); + } + } request = request->next; } client = client->next; @@ -485,10 +560,7 @@ GNUNET_GE_ASSERT (capi->ectx, GNUNET_SYSERR != capi->cs_exit_handler_register (&handle_client_exit)); - GNUNET_cron_add_job (capi->cron, - &repeat_requests_job, - CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL); - + datastore = capi->request_service ("datastore"); stats = capi->request_service ("stats"); if (stats != NULL) { @@ -504,6 +576,9 @@ stats-> create (gettext_noop ("# gap query bloomfilter resizing updates")); } + GNUNET_cron_add_job (capi->cron, + &repeat_requests_job, + CHECK_REPEAT_FREQUENCY, CHECK_REPEAT_FREQUENCY, NULL); return 0; } @@ -518,6 +593,8 @@ cs_exit_handler_unregister (&handle_client_exit)); while (clients != NULL) handle_client_exit (clients->client); + coreAPI->release_service (datastore); + datastore = NULL; if (stats != NULL) { coreAPI->release_service (stats); Modified: GNUnet/src/applications/fs/gap/querymanager.h =================================================================== --- GNUnet/src/applications/fs/gap/querymanager.h 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/gap/querymanager.h 2008-02-21 06:51:29 UTC (rev 6401) @@ -42,6 +42,7 @@ * client disconnects. * * @param target peer known to have the content, maybe NULL. + * @param have_more do we have more results in our local datastore? */ void GNUNET_FS_QUERYMANAGER_start_query (const GNUNET_HashCode * query, @@ -50,7 +51,8 @@ unsigned int type, struct GNUNET_ClientHandle *client, const GNUNET_PeerIdentity * target, - const struct ResponseList *seen); + const struct ResponseList *seen, + int have_more); /** * Handle the given response (by forwarding it to Modified: GNUnet/src/applications/fs/gap/shared.h =================================================================== --- GNUnet/src/applications/fs/gap/shared.h 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/applications/fs/gap/shared.h 2008-02-21 06:51:29 UTC (rev 6401) @@ -286,6 +286,7 @@ */ extern struct GNUNET_Mutex *GNUNET_FS_lock; + /** * Free the request list, including the associated * list of pending requests, its entries in the Modified: GNUnet/src/include/fs.h =================================================================== --- GNUnet/src/include/fs.h 2008-02-21 06:50:11 UTC (rev 6400) +++ GNUnet/src/include/fs.h 2008-02-21 06:51:29 UTC (rev 6401) @@ -440,8 +440,23 @@ */ #define GNUNET_GAP_ESTIMATED_DATA_SIZE (32 * 1024) +/** + * If, after finding local results, we abort a GET + * iteration, we increment "have_more" by this value. + */ +#define HAVE_MORE_INCREMENT 2 +/** + * What is the maximum number of local results + * that we are willing to return synchronously? + */ +#define MAX_SYNC_PROCESSED 8 +/** + * What is the maximum number of local results + * that we are willing to return synchronously? + */ +#define MAX_ASYNC_PROCESSED 32 _______________________________________________ GNUnet-SVN mailing list GNUnet-SVN@gnu.org http://lists.gnu.org/mailman/listinfo/gnunet-svn