Next step on the road to data replication. At this point the interface
with Chunk through CLD is basically done.

Signed-off-by: Pete Zaitcev <[email protected]>

diff --git a/server/cldu.c b/server/cldu.c
index f1c2d4d..57332e3 100644
--- a/server/cldu.c
+++ b/server/cldu.c
@@ -10,16 +10,18 @@
 #include <string.h>
 #include <unistd.h>
 #include <event.h>
-#include <netdb.h>
-#include <arpa/nameser.h>
-#include <netinet/in.h>
-#include <resolv.h>            /* must follow in.h, nameser.h */
 #include <errno.h>
 #include <cldc.h>
+#include <elist.h>
 #include "tabled.h"
 
 #define ALIGN8(n)      ((8 - ((n) & 7)) & 7)
 
+struct chunk_node {
+       struct list_head link;
+       char name[65];
+};
+
 #define N_CLD          10      /* 5 * (v4+v6) */
 
 struct cld_host {
@@ -35,11 +37,18 @@ struct cld_session {
        int actx;               /* Active host cldv[actx] */
        struct cld_host cldv[N_CLD];
 
+       char *thiscell;
        struct event ev;        /* Associated with fd */
        char *cfname;           /* /tabled-cell directory */
        struct cldc_fh *cfh;    /* /tabled-cell directory, keep open for scan */
-       char *ffname;           /* /tabled-cell/ourhost */
-       struct cldc_fh *ffh;    /* /tabled-cell/ourhost, keep open for lock */
+       char *ffname;           /* /tabled-cell/thishost */
+       struct cldc_fh *ffh;    /* /tabled-cell/thishost, keep open for lock */
+       char *xfname;           /* /chunk-cell directory */
+       struct cldc_fh *xfh;    /* /chunk-cell directory */
+       char *yfname;           /* /chunk-cell/NID file */
+       struct cldc_fh *yfh;    /* /chunk-cell/NID file */
+
+       struct list_head chunks;        /* found in xfname, struct chunk_node */
 
        void (*state_cb)(enum st_cld);
 };
@@ -47,14 +56,19 @@ struct cld_session {
 static int cldu_set_cldc(struct cld_session *sp, int newactive);
 static int cldu_new_sess(struct cldc_call_opts *carg, enum cle_err_codes errc);
 static int cldu_open_c_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
-#if 0
-static int cldu_close_c_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
-#endif
 static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
 static int cldu_lock_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
 static int cldu_put_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
 static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
-static void add_remote(char *name);
+static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
+static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
+static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
+static void next_chunk(struct cld_session *sp);
+static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
+static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc);
+static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc);
+static void add_remote(const char *name);
+static void add_chunk_node(struct cld_session *sp, const char *name);
 
 /*
  * Identify the next host to be tried.
@@ -78,45 +92,35 @@ static int cldu_nextactive(struct cld_session *sp)
        return sp->actx;
 }
 
-static int cldu_setcell(struct cld_session *sp, const char *thiscell)
+static int cldu_setcell(struct cld_session *sp,
+                       const char *thiscell, const char *thishost)
 {
-       size_t cnlen;
-       size_t mlen;
        char *mem;
 
        if (thiscell == NULL) {
                thiscell = "default";
        }
 
-       cnlen = strlen(thiscell);
-
-#if 0 /* old way without directories */
-       cnlen = strlen(thiscell);
-       mlen = sizeof("/tabled-")-1;
-       mlen += cnlen;
-       mlen += sizeof("-master")-1;
-       mlen++;
-       mem = malloc(mlen);
-       sprintf(mem, "/tabled-%s-master", thiscell);
-       sp->mfname = mem;
-#endif
+       sp->thiscell = strdup(thiscell);
+       if (!sp->thiscell)
+               goto err_oom;
 
-       mlen = sizeof("/tabled-")-1;
-       mlen += cnlen;
-       mlen++; // '\0'
-       mem = malloc(mlen);
-       sprintf(mem, "/tabled-%s", thiscell);
+       if (asprintf(&mem, "/tabled-%s", thiscell) == -1)
+               goto err_oom;
        sp->cfname = mem;
 
-       mlen = sizeof("/tabled-")-1;
-       mlen += cnlen;
-       mlen++; // '/'
-       mlen += strlen(tabled_srv.ourhost);
-       mlen++; // '\0'
-       mem = malloc(mlen);
-       sprintf(mem, "/tabled-%s/%s", thiscell, tabled_srv.ourhost);
+       if (asprintf(&mem, "/tabled-%s/%s", thiscell, thishost) == -1)
+               goto err_oom;
        sp->ffname = mem;
 
+       if (asprintf(&mem, "/chunk-%s", thiscell) == -1)
+               goto err_oom;
+       sp->xfname = mem;
+
+       return 0;
+
+err_oom:
+       applog(LOG_WARNING, "OOM in cldu");
        return 0;
 }
 
@@ -334,19 +338,6 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, 
enum cle_err_codes errc)
        if (debugging)
                applog(LOG_DEBUG, "CLD directory \"%s\" created", sp->cfname);
 
-#if 0 /* Don't close the directory, we'll rescan later instead */
-       /*
-        * We don't use directory handle to open files in it, so close it.
-        */
-       memset(&copts, 0, sizeof(copts));
-       copts.cb = cldu_close_c_cb;
-       copts.private = sp;
-       rc = cldc_close(sp->cfh, &copts);
-       if (rc) {
-               applog(LOG_ERR, "cldc_close call error %d", rc);
-       }
-#endif
-
        /*
         * Then, create the membership file for us.
         */
@@ -362,36 +353,6 @@ static int cldu_open_c_cb(struct cldc_call_opts *carg, 
enum cle_err_codes errc)
        return 0;
 }
 
-#if 0
-static int cldu_close_c_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc)
-{
-       struct cld_session *sp = carg->private;
-       struct cldc_call_opts copts;
-       int rc;
-
-       if (errc != CLE_OK) {
-               applog(LOG_ERR, "CLD close(%s) failed: %d", sp->cfname, errc);
-               return 0;
-       }
-
-/* P3 */ applog(LOG_INFO, "CLD close success, opening %s", sp->ffname);
-
-       /*
-        * Then, create the membership file for us.
-        */
-       memset(&copts, 0, sizeof(copts));
-       copts.cb = cldu_open_f_cb;
-       copts.private = sp;
-       rc = cldc_open(sp->lib->sess, &copts, sp->ffname,
-                      COM_WRITE | COM_LOCK | COM_CREATE,
-                      CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->ffh);
-       if (rc) {
-               applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->ffname, rc);
-       }
-       return 0;
-}
-#endif
-
 static int cldu_open_f_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 {
        struct cld_session *sp = carg->private;
@@ -437,7 +398,7 @@ static int cldu_lock_cb(struct cldc_call_opts *carg, enum 
cle_err_codes errc)
        int rc;
 
        if (errc != CLE_OK) {
-               applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->cfname, errc);
+               applog(LOG_ERR, "CLD lock(%s) failed: %d", sp->ffname, errc);
                return 0;
        }
 
@@ -490,8 +451,8 @@ static int cldu_put_cb(struct cldc_call_opts *carg, enum 
cle_err_codes errc)
 static int cldu_get_1_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
 {
        struct cld_session *sp = carg->private;
-       // struct cldc_call_opts copts;
-       // int rc;
+       struct cldc_call_opts copts;
+       int rc;
        const char *ptr;
        int dir_len;
        int total_len, rec_len, name_len;
@@ -518,7 +479,7 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, enum 
cle_err_codes errc)
                else
                        buf[64] = 0;
 
-               if (!strcmp(buf, tabled_srv.ourhost)) {
+               if (!strcmp(buf, tabled_srv.ourhost)) { /* use thishost XXX */
                        if (debugging)
                                applog(LOG_DEBUG, " %s (ourselves)", buf);
                } else {
@@ -534,13 +495,249 @@ static int cldu_get_1_cb(struct cldc_call_opts *carg, 
enum cle_err_codes errc)
        if (sp->state_cb)
                (*sp->state_cb)(ST_CLD_ACTIVE);
 
+       /*
+        * Now we can collect the Chunk nodes in our cell.
+        */
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_open_x_cb;
+       copts.private = sp;
+       rc = cldc_open(sp->lib->sess, &copts, sp->xfname,
+                      COM_READ | COM_DIRECTORY,
+                      CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->xfh);
+       if (rc) {
+               applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->xfname, rc);
+       }
+       return 0;
+}
+
+static int cldu_open_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+{
+       struct cld_session *sp = carg->private;
+       struct cldc_call_opts copts;
+       int rc;
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD open(%s) failed: %d", sp->xfname, errc);
+               /* XXX recycle, maybe Chunks aren't up yet. */
+               return 0;
+       }
+       if (sp->xfh == NULL) {
+               applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->xfname);
+               return 0;
+       }
+       if (!sp->xfh->valid) {
+               applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->xfname);
+               return 0;
+       }
+
+       // if (debugging)
+               applog(LOG_DEBUG, "CLD directory \"%s\" opened", sp->xfname);
+
+       /*
+        * Read the directory.
+        */
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_get_x_cb;
+       copts.private = sp;
+       rc = cldc_get(sp->xfh, &copts, false);
+       if (rc) {
+               applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->cfname, rc);
+       }
+       return 0;
+}
+
+static int cldu_get_x_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+{
+       struct cld_session *sp = carg->private;
+       struct cldc_call_opts copts;
+       int rc;
+       const char *ptr;
+       int dir_len;
+       int total_len, rec_len, name_len;
+       char buf[65];
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD get(%s) failed: %d", sp->xfname, errc);
+               return 0;
+       }
+
+       if (debugging)
+               applog(LOG_DEBUG, "Known Chunk nodes");
+
+       ptr = carg->u.get.buf;
+       dir_len = carg->u.get.size;
+       while (dir_len) {
+               name_len = GUINT16_FROM_LE(*(uint16_t *)ptr);
+               rec_len = name_len + 2;
+               total_len = rec_len + ALIGN8(rec_len);
+
+               strncpy(buf, ptr+2, 64);
+               if (name_len < 64)
+                       buf[name_len] = 0;
+               else
+                       buf[64] = 0;
+
+               if (debugging)
+                       applog(LOG_DEBUG, " %s", buf);
+               add_chunk_node(sp, buf);
+
+               ptr += total_len;
+               dir_len -= total_len;
+       }
+
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_close_x_cb;
+       copts.private = sp;
+       rc = cldc_close(sp->xfh, &copts);
+       if (rc) {
+               applog(LOG_ERR, "cldc_close call error %d", rc);
+       }
+       return 0;
+}
+
+static int cldu_close_x_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc)
+{
+       struct cld_session *sp = carg->private;
+       // struct cldc_call_opts copts;
+       // int rc;
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD close(%s) failed: %d", sp->xfname, errc);
+               return 0;
+       }
+
+       if (list_empty(&sp->chunks))
+               applog(LOG_INFO, "No Chunk nodes found");
+       else
+               next_chunk(sp);
+       return 0;
+}
+
+static void next_chunk(struct cld_session *sp)
+{
+       struct chunk_node *np;
+       char *mem;
+       struct cldc_call_opts copts;
+       int rc;
+
+       np = list_entry(sp->chunks.next, struct chunk_node, link);
+
+       if (asprintf(&mem, "/chunk-%s/%s", sp->thiscell, np->name) == -1) {
+               applog(LOG_WARNING, "OOM in cldu");
+               return;
+       }
+       sp->yfname = mem;
+
+       if (debugging)
+               applog(LOG_DEBUG, "opening chunk parameters %s", sp->yfname);
+
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_open_y_cb;
+       copts.private = sp;
+       rc = cldc_open(sp->lib->sess, &copts, sp->yfname,
+                      COM_READ,
+                      CE_MASTER_FAILOVER | CE_SESS_FAILED, &sp->yfh);
+       if (rc) {
+               applog(LOG_ERR, "cldc_open(%s) call error: %d", sp->yfname, rc);
+       }
+}
+
+static int cldu_open_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+{
+       struct cld_session *sp = carg->private;
+       struct cldc_call_opts copts;
+       int rc;
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD open(%s) failed: %d", sp->yfname, errc);
+               free(sp->yfname);
+               sp->yfname = NULL;
+               return 0;
+       }
+       if (sp->yfh == NULL) {
+               applog(LOG_ERR, "CLD open(%s) failed: NULL fh", sp->yfname);
+               free(sp->yfname);
+               sp->yfname = NULL;
+               return 0;
+       }
+       if (!sp->yfh->valid) {
+               applog(LOG_ERR, "CLD open(%s) failed: invalid fh", sp->yfname);
+               free(sp->yfname);
+               sp->yfname = NULL;
+               return 0;
+       }
+
+       /*
+        * Read the Chunk's parameter file.
+        */
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_get_y_cb;
+       copts.private = sp;
+       rc = cldc_get(sp->yfh, &copts, false);
+       if (rc) {
+               applog(LOG_ERR, "cldc_get(%s) call error: %d", sp->yfname, rc);
+       }
+       return 0;
+}
+
+static int cldu_get_y_cb(struct cldc_call_opts *carg, enum cle_err_codes errc)
+{
+       struct cld_session *sp = carg->private;
+       struct cldc_call_opts copts;
+       int rc;
+       const char *ptr;
+       int len;
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD get(%s) failed: %d", sp->yfname, errc);
+               goto close_and_next;    /* spaghetti */
+       }
+
+       ptr = carg->u.get.buf;
+       len = carg->u.get.size;
+       if (debugging)
+               applog(LOG_DEBUG,
+                      "got %d bytes from %s\n", len, sp->yfname);
+       stor_add_node(ptr, len);
+
+close_and_next:
+       memset(&copts, 0, sizeof(copts));
+       copts.cb = cldu_close_y_cb;
+       copts.private = sp;
+       rc = cldc_close(sp->yfh, &copts);
+       if (rc) {
+               applog(LOG_ERR, "cldc_close call error %d", rc);
+       }
+       return 0;
+}
+
+static int cldu_close_y_cb(struct cldc_call_opts *carg, enum cle_err_codes 
errc)
+{
+       struct cld_session *sp = carg->private;
+       struct chunk_node *np;
+       // struct cldc_call_opts copts;
+       // int rc;
+
+       if (errc != CLE_OK) {
+               applog(LOG_ERR, "CLD close(%s) failed: %d", sp->yfname, errc);
+               return 0;
+       }
+
+       free(sp->yfname);
+       sp->yfname = NULL;
+
+       np = list_entry(sp->chunks.next, struct chunk_node, link);
+       list_del(&np->link);
+
+       if (!list_empty(&sp->chunks))
+               next_chunk(sp);
        return 0;
 }
 
 /*
  * FIXME need to read port number from the file (port:<space>num).
  */
-static void add_remote(char *name)
+static void add_remote(const char *name)
 {
        struct db_remote *rp;
 
@@ -559,6 +756,23 @@ static void add_remote(char *name)
 }
 
 /*
+ * Save the available chunk numbers into a local list temporarily,
+ * because we don't want to hog the event thread with reading.
+ */
+static void add_chunk_node(struct cld_session *sp, const char *name)
+{
+       struct chunk_node *np;
+
+       np = malloc(sizeof(*np));
+       if (!np)
+               return;
+
+       strncpy(np->name, name, sizeof(np->name)-1);
+       np->name[sizeof(np->name)-1] = 0;
+       list_add_tail(&np->link, &sp->chunks);
+}
+
+/*
  */
 static struct cld_session ses;
 
@@ -569,6 +783,9 @@ int cld_begin(const char *thishost, const char *thiscell,
              void (*cb)(enum st_cld))
 {
 
+       cldc_init();
+       INIT_LIST_HEAD(&ses.chunks);
+
        /*
         * As long as we permit pre-seeding lists of CLD hosts,
         * we cannot wipe our session anymore. Note though, as long
@@ -577,7 +794,7 @@ int cld_begin(const char *thishost, const char *thiscell,
        // memset(&ses, 0, sizeof(struct cld_session));
        ses.state_cb = cb;
 
-       if (cldu_setcell(&ses, thiscell)) {
+       if (cldu_setcell(&ses, thiscell, thishost)) {
                /* Already logged error */
                goto err_cell;
        }
@@ -651,6 +868,9 @@ void cld_end(void)
 
        free(ses.cfname);
        free(ses.ffname);
+       free(ses.xfname);
+       free(ses.yfname);
+       free(ses.thiscell);
 }
 
 void cldu_add_host(const char *hostname, unsigned int port)
diff --git a/server/storage.c b/server/storage.c
index cf07b41..8a621b5 100644
--- a/server/storage.c
+++ b/server/storage.c
@@ -309,6 +309,14 @@ bool stor_obj_test(struct open_chunk *cep, uint64_t key)
 }
 
 /*
+ * Add a node using its parameters file (not nul-terminated).
+ */
+void stor_add_node(const char *data, size_t len)
+{
+       /* P3 */ applog(LOG_INFO, "Adding some node or sumthin.");
+}
+
+/*
  * Wait for chunkd instances to come up, in case we start simultaneously
  * from a "make check" or a parallel boot script on the same computer,
  * of if a datacenter is being brought up.
diff --git a/server/tabled.h b/server/tabled.h
index d12d032..4c7cd95 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -312,6 +312,7 @@ extern bool stor_put_end(struct open_chunk *cep);
 extern ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t len);
 extern int stor_obj_del(struct storage_node *stn, uint64_t key);
 extern bool stor_obj_test(struct open_chunk *cep, uint64_t key);
+extern void stor_add_node(const char *data, size_t len);
 extern void stor_init(void);
 
 #endif /* __TABLED_H__ */
diff --git a/test/start-daemon b/test/start-daemon
index bb7ee82..85ef700 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -18,12 +18,10 @@ fi
 
 # May be different on Solaris... like /usr/libexec or such.
 cld -d data/cld -P cld.pid -p 18081 -E
-
-sleep 3
-
 chunkd -P chunkd.pid -f $top_srcdir/test/chunkd-test.conf -E
 
-sleep 3
+# 3 is enough, but we like to let chunkd to come up eary and register with CLD
+sleep 7
 
 ../server/tabled -P tabled.pid -C $top_srcdir/test/tabled-test.conf -E
 
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to