Hello,

Limit number of unprocessed syncrepl events in queue to 100.

All syncrepl events are processed sequentialy. This patch limits
memory consumption in cases where the LDAP server is sending
syncrepl events too quickly.

LDAP client library should handle this situation so no events
will be lost.


This patch should go to the head of future master branch (rbtdb.v22).

--
Petr^2 Spacek
From ebd5264baf76f86240d45e8024aaac44e677b941 Mon Sep 17 00:00:00 2001
From: Petr Spacek <pspa...@redhat.com>
Date: Tue, 7 Jan 2014 17:25:43 +0100
Subject: [PATCH] Limit number of unprocessed syncrepl events in queue to 100.

All syncrepl events are processed sequentialy. This patch limits
memory consumption in cases where the LDAP server is sending
syncrepl events too quickly.

LDAP client library should handle this situation so no events
will be lost.

Signed-off-by: Petr Spacek <pspa...@redhat.com>
---
 src/ldap_helper.c | 10 ++++++++--
 src/semaphore.c   | 20 +++++++++++++++++++-
 src/semaphore.h   |  1 +
 src/syncrepl.c    | 36 ++++++++++++++++++++++++++++++++++++
 src/syncrepl.h    |  6 ++++++
 5 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/src/ldap_helper.c b/src/ldap_helper.c
index a3acf82b218876395c56c2b85c462bfd2178847e..25f6aff4d8ed33b6067b4c1f75399f17e85d64dc 100644
--- a/src/ldap_helper.c
+++ b/src/ldap_helper.c
@@ -3910,6 +3910,7 @@ update_zone(isc_task_t *task, isc_event_t *event)
 	}
 
 cleanup:
+	sync_concurr_limit_signal(inst->sctx);
 	if (result != ISC_R_SUCCESS)
 		log_error_r("update_zone (syncrepl) failed for '%s'. "
 			  "Zones can be outdated, run `rndc reload`",
@@ -3944,6 +3945,7 @@ update_config(isc_task_t *task, isc_event_t *event)
 	CHECK(ldap_parse_configentry(entry, inst));
 
 cleanup:
+	sync_concurr_limit_signal(inst->sctx);
 	if (result != ISC_R_SUCCESS)
 		log_error_r("update_config (syncrepl) failed for '%s'. "
 			    "Configuration can be outdated, run `rndc reload`",
@@ -4195,6 +4197,7 @@ cleanup:
 			  pevent->dn, pevent->chgtype);
 	}
 
+	sync_concurr_limit_signal(inst->sctx);
 	if (zone_ptr != NULL)
 		dns_zone_detach(&zone_ptr);
 	if (dns_name_dynamic(&name))
@@ -4394,6 +4397,7 @@ cleanup:
 	if (result != ISC_R_SUCCESS) {
 		log_error_r("syncrepl_update failed for object '%s'",
 			    entry->dn);
+		sync_concurr_limit_signal(inst->sctx);
 
 		if (dbname != NULL)
 			isc_mem_free(mctx, dbname);
@@ -4508,19 +4512,21 @@ int ldap_sync_search_entry (
 	if (inst->exiting)
 		return LDAP_SUCCESS;
 
-
+	sync_concurr_limit_wait(inst->sctx);
 	CHECK(ldap_entry_create(inst->mctx, ls->ls_ld, msg, &entry));
 	syncrepl_update(inst, entry, phase);
 #ifdef RBTDB_DEBUG
 	if (++count % 100 == 0)
 		log_info("ldap_sync_search_entry: %u entries read; inuse: %zd",
 			 count, isc_mem_inuse(inst->mctx));
 #endif
 
 cleanup:
-	if (result != ISC_R_SUCCESS)
+	if (result != ISC_R_SUCCESS) {
 		log_error_r("ldap_sync_search_entry failed");
+		sync_concurr_limit_signal(inst->sctx);
 		/* TODO: Add 'tainted' flag to the LDAP instance. */
+	}
 
 	/* Following return code will never reach upper layers.
 	 * It is limitation in ldap_sync_init() and ldap_sync_poll()
diff --git a/src/semaphore.c b/src/semaphore.c
index 352219f113a233218b5522beea5520dddbd748e6..b6ad7bf090467c29d749d7165e7122d0075131d7 100644
--- a/src/semaphore.c
+++ b/src/semaphore.c
@@ -82,7 +82,25 @@ semaphore_destroy(semaphore_t *sem)
 	RUNTIME_CHECK(isc_condition_destroy(&sem->cond) == ISC_R_SUCCESS);
 }
 
-/*
+/**
+ * Wait on semaphore. This operation will try to acquire a lock on the
+ * semaphore. If the semaphore is already acquired as many times at it allows,
+ * the function will block until someone releases the lock.
+ */
+void
+semaphore_wait(semaphore_t *sem)
+{
+	REQUIRE(sem != NULL);
+
+	LOCK(&sem->mutex);
+	while (sem->value <= 0)
+		WAIT(&sem->cond, &sem->mutex);
+	sem->value--;
+
+	UNLOCK(&sem->mutex);
+}
+
+/**
  * Wait on semaphore. This operation will try to acquire a lock on the
  * semaphore. If the semaphore is already acquired as many times at it allows,
  * the function will block until someone releases the lock OR timeout expire.
diff --git a/src/semaphore.h b/src/semaphore.h
index 2c40e1cc91b4b9df1f37135bdfaac592532bc115..c96ebedcd8f388020c4ee28c5547af2bb1282a1d 100644
--- a/src/semaphore.h
+++ b/src/semaphore.h
@@ -46,6 +46,7 @@ typedef struct semaphore	semaphore_t;
 /* Public functions. */
 isc_result_t	semaphore_init(semaphore_t *sem, int value) ATTR_NONNULLS;
 void		semaphore_destroy(semaphore_t *sem) ATTR_NONNULLS;
+void		semaphore_wait(semaphore_t *sem) ATTR_NONNULLS;
 isc_result_t	semaphore_wait_timed(semaphore_t *sem) ATTR_NONNULLS;
 void		semaphore_signal(semaphore_t *sem) ATTR_NONNULLS;
 
diff --git a/src/syncrepl.c b/src/syncrepl.c
index d3a8ca681a1ed7d15b3b9e3f790bbc837546694e..ba7a79d6fa50752edd871165fe6f56c70650c7aa 100644
--- a/src/syncrepl.c
+++ b/src/syncrepl.c
@@ -28,11 +28,17 @@
 
 #include "ldap_helper.h"
 #include "util.h"
+#include "semaphore.h"
 #include "syncrepl.h"
 #include "zone_manager.h"
 
 #define LDAPDB_EVENT_SYNCREPL_BARRIER	(LDAPDB_EVENTCLASS + 2)
 
+/** How many unprocessed LDAP events from syncrepl can be in event queue.
+ *  Adding new events into the queue is blocked until some events
+ *  are processed. */
+#define LDAP_CONCURRENCY_LIMIT 100
+
 typedef struct task_element task_element_t;
 struct task_element {
 	isc_task_t			*task;
@@ -72,6 +78,9 @@ struct task_element {
 struct sync_ctx {
 	isc_refcount_t			task_cnt; /**< provides atomic access */
 	isc_mem_t			*mctx;
+	/** limit number of unprocessed LDAP events in queue
+	 *  (memory consumption is one of problems) */
+	semaphore_t			concurr_limit;
 
 	isc_mutex_t			mutex;	/**< guards rest of the structure */
 	isc_condition_t			cond;	/**< for signal when task_cnt == 0 */
@@ -199,6 +208,8 @@ sync_ctx_init(isc_mem_t *mctx, isc_task_t *task, sync_ctx_t **sctxp) {
 	sctx->state = sync_init;
 	CHECK(sync_task_add(sctx, task));
 
+	CHECK(semaphore_init(&sctx->concurr_limit, LDAP_CONCURRENCY_LIMIT));
+
 	*sctxp = sctx;
 	return ISC_R_SUCCESS;
 
@@ -350,3 +361,28 @@ cleanup:
 		isc_event_free(&ev);
 	return result;
 }
+
+/**
+ * Wait until there is a free slot in syncrepl 'queue' - this limits number
+ * of unprocessed ISC events to #LDAP_CONCURRENCY_LIMIT.
+ *
+ * End of syncrepl event processing has to be signalled by
+ * sync_concurr_limit_signal() call.
+ */
+void
+sync_concurr_limit_wait(sync_ctx_t *sctx) {
+	REQUIRE(sctx != NULL);
+
+	semaphore_wait(&sctx->concurr_limit);
+}
+
+/**
+ * Signal that syncrepl event was processed and the slot in concurrency limit
+ * can be freed.
+ */
+void
+sync_concurr_limit_signal(sync_ctx_t *sctx) {
+	REQUIRE(sctx != NULL);
+
+	semaphore_signal(&sctx->concurr_limit);
+}
diff --git a/src/syncrepl.h b/src/syncrepl.h
index c0f707c96e3a66bf2cb9013182911cc7666783bb..2012a9bbe4eb726842500fbdebc6d471d99123db 100644
--- a/src/syncrepl.h
+++ b/src/syncrepl.h
@@ -60,4 +60,10 @@ sync_task_add(sync_ctx_t *sctx, isc_task_t *task);
 isc_result_t
 sync_barrier_wait(sync_ctx_t *sctx, const char *inst_name);
 
+void ATTR_NONNULLS
+sync_concurr_limit_wait(sync_ctx_t *sctx);
+
+void ATTR_NONNULLS
+sync_concurr_limit_signal(sync_ctx_t *sctx);
+
 #endif /* SYNCREPL_H_ */
-- 
1.8.3.1

_______________________________________________
Freeipa-devel mailing list
Freeipa-devel@redhat.com
https://www.redhat.com/mailman/listinfo/freeipa-devel

Reply via email to