We replace a comment with code to better show the intent. Once we have
several threads, we can plug TLS easier into this.

Signed-off-by: Pete Zaitcev <[email protected]>

---
 server/replica.c |   69 ++++++++++++++++++++++++++-------------------
 1 file changed, 41 insertions(+), 28 deletions(-)

Not sure if we actually want this at this stage, but I coded it up
yesterday after we talked about argument passing and TLS, to see just
how many rep_xxx functions need extra arguments. It was not too bad.

commit 32c8b8072c901e549ecbd8f1a29581b37f6cec16
Author: Master <[email protected]>
Date:   Tue Dec 15 21:20:22 2009 -0700

    Pass arguments to a thread by official means.

diff --git a/server/replica.c b/server/replica.c
index 067accb..83b559d 100644
--- a/server/replica.c
+++ b/server/replica.c
@@ -27,11 +27,16 @@
 #include <elist.h>
 #include "tabled.h"
 
+struct rep_arg {
+       struct event_base *evbase;
+};
+
 /*
  * Replication Job
  */
 struct rep_job {
        struct list_head jlink;
+       struct rep_arg *arg;
 
        uint64_t oid;
        uint64_t size;          /* all of the object */
@@ -57,11 +62,17 @@ static struct rep_jobs active = { 0, 
LIST_HEAD_INIT(active.jlist) };
 static struct rep_jobs queue = { 0, LIST_HEAD_INIT(queue.jlist) };
 static struct rep_jobs done = { 0, LIST_HEAD_INIT(done.jlist) };
 
-static struct event_base *evbase;
+/*
+ * These should actually be thread-local, but we only have one thread.
+ */
 static struct event kscan_timer;       /* db4 key rescan timer */
+static time_t kscan_last;
+
+/*
+ * These are module-scope things: global locks and flags, thread list, etc.
+ */
 static bool kscan_enabled = false;
 static GThread *scan_thread;
-static time_t kscan_last;
 
 static void job_dispatch(void);
 
@@ -253,7 +264,7 @@ static void job_dispatch()
        if (!job->buf)
                goto err_malloc;
 
-       rc = stor_open(&job->in_ce, job->src, evbase);
+       rc = stor_open(&job->in_ce, job->src, job->arg->evbase);
        if (rc) {
                applog(LOG_WARNING, "Cannot open input chunk, nid %u (%d)",
                       job->src->id, rc);
@@ -261,7 +272,7 @@ static void job_dispatch()
        }
        job->in_ce.cli = job;
 
-       rc = stor_open(&job->out_ce, job->dst, evbase);
+       rc = stor_open(&job->out_ce, job->dst, job->arg->evbase);
        if (rc) {
                applog(LOG_WARNING, "Cannot open output chunk, nid %u (%d)",
                       job->dst->id, rc);
@@ -366,7 +377,8 @@ static struct rep_job *job_find_by_oid(uint64_t oid)
 }
 
 /* start replicating the key somewhere */
-static void rep_job_start(size_t klen, struct db_obj_key *key,
+static void rep_job_start(struct rep_arg *arg,
+                         size_t klen, struct db_obj_key *key,
                          uint64_t oid, uint64_t objsize,
                          int nnum, struct storage_node *nvec[])
 {
@@ -386,18 +398,15 @@ static void rep_job_start(size_t klen, struct db_obj_key 
*key,
        job = job_alloc(klen, key);
        if (!job)
                goto err_alloc;
+       job->arg = arg;
        job->oid = oid;
        job->size = objsize;
        job->src = job_select_src(nnum, nvec);
-       if (!job->src) {
-               /* P3 */ applog(LOG_INFO, "no src oid %llX", (long long) oid);
+       if (!job->src)
                goto err_src;
-       }
        job->dst = job_select_dst(nnum, nvec);
-       if (!job->dst) {
-               /* P3 */ applog(LOG_INFO, "no dst oid %llX", (long long) oid);
+       if (!job->dst)
                goto err_dst;
-       }
        if (job->src->id == job->dst->id) {
                /* Is this bad enough to invoke exit(1) right here? */
                applog(LOG_ERR, "Internal error, copy from/to nid %u",
@@ -540,7 +549,8 @@ static int rep_scan_parse(struct cursor *cp, struct 
db_obj_ent *obj)
 }
 
 /* meat of scan - check if replication is need on the key */
-static void rep_scan_verify(struct cursor *cp, struct db_obj_ent *obj)
+static void rep_scan_verify(struct rep_arg *arg,
+                           struct cursor *cp, struct db_obj_ent *obj)
 {
        char bucket_name[65];
        char object_name[1025];
@@ -595,9 +605,8 @@ static void rep_scan_verify(struct cursor *cp, struct 
db_obj_ent *obj)
               allcnt, redcnt);
 
        if (redcnt < MAXWAY) {          /* maybe have MINWAY too? */
-               rep_job_start(cp->klen, cp->key, oid,
-                             GUINT64_FROM_LE(obj->size),
-                             redcnt, redvec);
+               rep_job_start(arg, cp->klen, cp->key, oid,
+                             GUINT64_FROM_LE(obj->size), redcnt, redvec);
        }
 
        for (i = 0; i < redcnt; i++)
@@ -723,7 +732,7 @@ static void rep_retire(void)
        }
 }
 
-static void rep_scan(void)
+static void rep_scan(struct rep_arg *arg)
 {
        struct cursor cur;
        struct db_obj_ent *obj;
@@ -767,7 +776,7 @@ static void rep_scan(void)
                }
 
                if (!GUINT32_FROM_LE(obj->flags) & DB_OBJ_INLINE)
-                       rep_scan_verify(&cur, obj);
+                       rep_scan_verify(arg, &cur, obj);
 
                free(obj);
                kcnt++;
@@ -792,17 +801,20 @@ static void add_kscan_timer(void)
 
 static void tdb_keyscan(int fd, short events, void *userdata)
 {
+       struct rep_arg *arg = userdata;
+
        if (kscan_enabled)
-               rep_scan();
+               rep_scan(arg);
        add_kscan_timer();
 }
 
 static gpointer rep_thread_func(gpointer data)
 {
+       struct rep_arg *arg = data;
        int rc;
 
-       evtimer_set(&kscan_timer, tdb_keyscan, NULL);
-       event_base_set(evbase, &kscan_timer);
+       evtimer_set(&kscan_timer, tdb_keyscan, arg);
+       event_base_set(arg->evbase, &kscan_timer);
 
        /*
         * We must add an event now, or else event_base_dispatch will
@@ -811,7 +823,7 @@ static gpointer rep_thread_func(gpointer data)
        add_kscan_timer();
 
        for (;;) {
-               rc = event_base_dispatch(evbase);
+               rc = event_base_dispatch(arg->evbase);
                applog(LOG_ERR, "rep event_base_dispatch exits (%d)", rc);
                sleep(300);     /* Should not happen, so maybe exit(1)? */
        }
@@ -821,15 +833,16 @@ static gpointer rep_thread_func(gpointer data)
 void rep_init(struct event_base *ev_base)
 {
        GError *error;
+       struct rep_arg *arg;
 
-       /* We could pass this event_base as an arg to our replica thread
-        * via g_thread_create(), but that seems pointless given that
-        * we are storing the event base as a module-local static
-        * anyway.
-        */
-       evbase = ev_base;
+       arg = malloc(sizeof(struct rep_arg));
+       if (!arg) {
+               applog(LOG_ERR, "No core");
+               exit(1);
+       }
+       arg->evbase = ev_base;
 
-       scan_thread = g_thread_create(rep_thread_func, NULL, FALSE, &error);
+       scan_thread = g_thread_create(rep_thread_func, arg, FALSE, &error);
        if (scan_thread == NULL) {
                applog(LOG_ERR, "Failed to start replication thread: %s",
                       error->message);
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to