Hello, Limit number of unprocessed syncrepl events in queue to 100.
All syncrepl events are processed sequentialy. This patch limits memory consumption in cases where the LDAP server is sending syncrepl events too quickly. LDAP client library should handle this situation so no events will be lost. This patch should go to the head of future master branch (rbtdb.v22). -- Petr^2 Spacek
From ebd5264baf76f86240d45e8024aaac44e677b941 Mon Sep 17 00:00:00 2001 From: Petr Spacek <pspa...@redhat.com> Date: Tue, 7 Jan 2014 17:25:43 +0100 Subject: [PATCH] Limit number of unprocessed syncrepl events in queue to 100. All syncrepl events are processed sequentialy. This patch limits memory consumption in cases where the LDAP server is sending syncrepl events too quickly. LDAP client library should handle this situation so no events will be lost. Signed-off-by: Petr Spacek <pspa...@redhat.com> --- src/ldap_helper.c | 10 ++++++++-- src/semaphore.c | 20 +++++++++++++++++++- src/semaphore.h | 1 + src/syncrepl.c | 36 ++++++++++++++++++++++++++++++++++++ src/syncrepl.h | 6 ++++++ 5 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/ldap_helper.c b/src/ldap_helper.c index a3acf82b218876395c56c2b85c462bfd2178847e..25f6aff4d8ed33b6067b4c1f75399f17e85d64dc 100644 --- a/src/ldap_helper.c +++ b/src/ldap_helper.c @@ -3910,6 +3910,7 @@ update_zone(isc_task_t *task, isc_event_t *event) } cleanup: + sync_concurr_limit_signal(inst->sctx); if (result != ISC_R_SUCCESS) log_error_r("update_zone (syncrepl) failed for '%s'. " "Zones can be outdated, run `rndc reload`", @@ -3944,6 +3945,7 @@ update_config(isc_task_t *task, isc_event_t *event) CHECK(ldap_parse_configentry(entry, inst)); cleanup: + sync_concurr_limit_signal(inst->sctx); if (result != ISC_R_SUCCESS) log_error_r("update_config (syncrepl) failed for '%s'. " "Configuration can be outdated, run `rndc reload`", @@ -4195,6 +4197,7 @@ cleanup: pevent->dn, pevent->chgtype); } + sync_concurr_limit_signal(inst->sctx); if (zone_ptr != NULL) dns_zone_detach(&zone_ptr); if (dns_name_dynamic(&name)) @@ -4394,6 +4397,7 @@ cleanup: if (result != ISC_R_SUCCESS) { log_error_r("syncrepl_update failed for object '%s'", entry->dn); + sync_concurr_limit_signal(inst->sctx); if (dbname != NULL) isc_mem_free(mctx, dbname); @@ -4508,19 +4512,21 @@ int ldap_sync_search_entry ( if (inst->exiting) return LDAP_SUCCESS; - + sync_concurr_limit_wait(inst->sctx); CHECK(ldap_entry_create(inst->mctx, ls->ls_ld, msg, &entry)); syncrepl_update(inst, entry, phase); #ifdef RBTDB_DEBUG if (++count % 100 == 0) log_info("ldap_sync_search_entry: %u entries read; inuse: %zd", count, isc_mem_inuse(inst->mctx)); #endif cleanup: - if (result != ISC_R_SUCCESS) + if (result != ISC_R_SUCCESS) { log_error_r("ldap_sync_search_entry failed"); + sync_concurr_limit_signal(inst->sctx); /* TODO: Add 'tainted' flag to the LDAP instance. */ + } /* Following return code will never reach upper layers. * It is limitation in ldap_sync_init() and ldap_sync_poll() diff --git a/src/semaphore.c b/src/semaphore.c index 352219f113a233218b5522beea5520dddbd748e6..b6ad7bf090467c29d749d7165e7122d0075131d7 100644 --- a/src/semaphore.c +++ b/src/semaphore.c @@ -82,7 +82,25 @@ semaphore_destroy(semaphore_t *sem) RUNTIME_CHECK(isc_condition_destroy(&sem->cond) == ISC_R_SUCCESS); } -/* +/** + * Wait on semaphore. This operation will try to acquire a lock on the + * semaphore. If the semaphore is already acquired as many times at it allows, + * the function will block until someone releases the lock. + */ +void +semaphore_wait(semaphore_t *sem) +{ + REQUIRE(sem != NULL); + + LOCK(&sem->mutex); + while (sem->value <= 0) + WAIT(&sem->cond, &sem->mutex); + sem->value--; + + UNLOCK(&sem->mutex); +} + +/** * Wait on semaphore. This operation will try to acquire a lock on the * semaphore. If the semaphore is already acquired as many times at it allows, * the function will block until someone releases the lock OR timeout expire. diff --git a/src/semaphore.h b/src/semaphore.h index 2c40e1cc91b4b9df1f37135bdfaac592532bc115..c96ebedcd8f388020c4ee28c5547af2bb1282a1d 100644 --- a/src/semaphore.h +++ b/src/semaphore.h @@ -46,6 +46,7 @@ typedef struct semaphore semaphore_t; /* Public functions. */ isc_result_t semaphore_init(semaphore_t *sem, int value) ATTR_NONNULLS; void semaphore_destroy(semaphore_t *sem) ATTR_NONNULLS; +void semaphore_wait(semaphore_t *sem) ATTR_NONNULLS; isc_result_t semaphore_wait_timed(semaphore_t *sem) ATTR_NONNULLS; void semaphore_signal(semaphore_t *sem) ATTR_NONNULLS; diff --git a/src/syncrepl.c b/src/syncrepl.c index d3a8ca681a1ed7d15b3b9e3f790bbc837546694e..ba7a79d6fa50752edd871165fe6f56c70650c7aa 100644 --- a/src/syncrepl.c +++ b/src/syncrepl.c @@ -28,11 +28,17 @@ #include "ldap_helper.h" #include "util.h" +#include "semaphore.h" #include "syncrepl.h" #include "zone_manager.h" #define LDAPDB_EVENT_SYNCREPL_BARRIER (LDAPDB_EVENTCLASS + 2) +/** How many unprocessed LDAP events from syncrepl can be in event queue. + * Adding new events into the queue is blocked until some events + * are processed. */ +#define LDAP_CONCURRENCY_LIMIT 100 + typedef struct task_element task_element_t; struct task_element { isc_task_t *task; @@ -72,6 +78,9 @@ struct task_element { struct sync_ctx { isc_refcount_t task_cnt; /**< provides atomic access */ isc_mem_t *mctx; + /** limit number of unprocessed LDAP events in queue + * (memory consumption is one of problems) */ + semaphore_t concurr_limit; isc_mutex_t mutex; /**< guards rest of the structure */ isc_condition_t cond; /**< for signal when task_cnt == 0 */ @@ -199,6 +208,8 @@ sync_ctx_init(isc_mem_t *mctx, isc_task_t *task, sync_ctx_t **sctxp) { sctx->state = sync_init; CHECK(sync_task_add(sctx, task)); + CHECK(semaphore_init(&sctx->concurr_limit, LDAP_CONCURRENCY_LIMIT)); + *sctxp = sctx; return ISC_R_SUCCESS; @@ -350,3 +361,28 @@ cleanup: isc_event_free(&ev); return result; } + +/** + * Wait until there is a free slot in syncrepl 'queue' - this limits number + * of unprocessed ISC events to #LDAP_CONCURRENCY_LIMIT. + * + * End of syncrepl event processing has to be signalled by + * sync_concurr_limit_signal() call. + */ +void +sync_concurr_limit_wait(sync_ctx_t *sctx) { + REQUIRE(sctx != NULL); + + semaphore_wait(&sctx->concurr_limit); +} + +/** + * Signal that syncrepl event was processed and the slot in concurrency limit + * can be freed. + */ +void +sync_concurr_limit_signal(sync_ctx_t *sctx) { + REQUIRE(sctx != NULL); + + semaphore_signal(&sctx->concurr_limit); +} diff --git a/src/syncrepl.h b/src/syncrepl.h index c0f707c96e3a66bf2cb9013182911cc7666783bb..2012a9bbe4eb726842500fbdebc6d471d99123db 100644 --- a/src/syncrepl.h +++ b/src/syncrepl.h @@ -60,4 +60,10 @@ sync_task_add(sync_ctx_t *sctx, isc_task_t *task); isc_result_t sync_barrier_wait(sync_ctx_t *sctx, const char *inst_name); +void ATTR_NONNULLS +sync_concurr_limit_wait(sync_ctx_t *sctx); + +void ATTR_NONNULLS +sync_concurr_limit_signal(sync_ctx_t *sctx); + #endif /* SYNCREPL_H_ */ -- 1.8.3.1
_______________________________________________ Freeipa-devel mailing list Freeipa-devel@redhat.com https://www.redhat.com/mailman/listinfo/freeipa-devel