Add ops table for storage back-ends, so that we can have several.

Signed-off-by: Pete Zaitcev <zait...@redhat.com>

---
 server/storage.c |   85 +++++++++++++++++++++---------------
 server/tabled.h  |  106 ++++++++++++++++++++++++++++++++++++---------
 2 files changed, 138 insertions(+), 53 deletions(-)

commit d2362897aa9efde5a30803a621a7d60e5bf5fd24
Author: Pete Zaitcev <zait...@yahoo.com>
Date:   Thu Nov 11 16:18:53 2010 -0700

    Add ops.

diff --git a/server/storage.c b/server/storage.c
index 5c6e3a7..1d3d6a1 100644
--- a/server/storage.c
+++ b/server/storage.c
@@ -107,8 +107,8 @@ static void stor_write_event(int fd, short events, void 
*userdata)
 /*
  * Open *cep using stn, set up chunk session if needed.
  */
-int stor_open(struct open_chunk *cep, struct storage_node *stn,
-             struct event_base *ev_base)
+static int chunk_open(struct open_chunk *cep, struct storage_node *stn,
+                     struct event_base *ev_base)
 {
        int rc;
 
@@ -126,8 +126,9 @@ int stor_open(struct open_chunk *cep, struct storage_node 
*stn,
        return 0;
 }
 
-int stor_put_start(struct open_chunk *cep, void (*cb)(struct open_chunk *),
-                  uint64_t key, uint64_t size)
+static int chunk_put_start(struct open_chunk *cep,
+                          void (*cb)(struct open_chunk *),
+                          uint64_t key, uint64_t size)
 {
        char stckey[STOR_KEY_SLEN+1];
 
@@ -161,8 +162,9 @@ int stor_put_start(struct open_chunk *cep, void 
(*cb)(struct open_chunk *),
        return 0;
 }
 
-int stor_open_read(struct open_chunk *cep, void (*cb)(struct open_chunk *),
-                  uint64_t key, uint64_t *psize)
+static int chunk_open_read(struct open_chunk *cep,
+                          void (*cb)(struct open_chunk *),
+                          uint64_t key, uint64_t *psize)
 {
        char stckey[STOR_KEY_SLEN+1];
        uint64_t size;
@@ -203,7 +205,7 @@ int stor_open_read(struct open_chunk *cep, void 
(*cb)(struct open_chunk *),
 /*
  * FIXME We don't cache sessions while tabled is being debugged. Maybe later.
  */
-void stor_close(struct open_chunk *cep)
+static void chunk_close(struct open_chunk *cep)
 {
        if (cep->stc) {
                stor_node_put(cep->node);
@@ -227,14 +229,11 @@ void stor_close(struct open_chunk *cep)
 }
 
 /*
- * The stor_abort has an annoying convention of being possibly called
- * on an unopened open_chunk. We deal with that.
- *
  * There's no "abort" call for an existing transfer. We could complete
  * the transfer instead of trashing the whole session, but that may involve
  * sending or receiving gigabytes. So we just cycle the session.
  */
-void stor_abort(struct open_chunk *cep)
+static void chunk_abort(struct open_chunk *cep)
 {
        char stckey[STOR_KEY_SLEN+1];
        int rc;
@@ -284,7 +283,7 @@ void stor_abort(struct open_chunk *cep)
        cep->key = 0;
 }
 
-ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len)
+static ssize_t chunk_put_buf(struct open_chunk *cep, void *data, size_t len)
 {
        int rc;
 
@@ -307,7 +306,7 @@ ssize_t stor_put_buf(struct open_chunk *cep, void *data, 
size_t len)
        return rc;
 }
 
-bool stor_put_end(struct open_chunk *cep)
+static bool chunk_put_end(struct open_chunk *cep)
 {
        if (!cep->stc)
                return true;
@@ -323,7 +322,7 @@ bool stor_put_end(struct open_chunk *cep)
  * This saves the object.c from the trouble of arming and disarming it,
  * at the cost of rather subtle semantics.
  */
-ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t req_len)
+static ssize_t chunk_get_buf(struct open_chunk *cep, void *data, size_t 
req_len)
 {
        size_t xfer_len;
        ssize_t ret;
@@ -358,7 +357,7 @@ ssize_t stor_get_buf(struct open_chunk *cep, void *data, 
size_t req_len)
        return ret;
 }
 
-int stor_obj_del(struct storage_node *stn, uint64_t key)
+static int chunk_obj_del(struct storage_node *stn, uint64_t key)
 {
        struct st_client *stc;
        char stckey[STOR_KEY_SLEN+1];
@@ -379,7 +378,7 @@ int stor_obj_del(struct storage_node *stn, uint64_t key)
 /*
  * XXX WTF?! This accidentially tests a node instead of object! FIXME
  */
-bool stor_obj_test(struct open_chunk *cep, uint64_t key)
+static bool chunk_obj_test(struct open_chunk *cep, uint64_t key)
 {
        struct st_keylist *klist;
 
@@ -393,6 +392,27 @@ bool stor_obj_test(struct open_chunk *cep, uint64_t key)
        return true;
 }
 
+/* Return 0 if the node checks out ok */
+static int chunk_node_check(struct storage_node *stn)
+{
+       struct st_client *stc;
+       int rc;
+
+       if (!stn->hostname)
+               return -1;
+
+       rc = stor_new_stc(stn, &stc);
+       if (rc < 0) {
+               applog(LOG_INFO,
+                      "Error %d connecting to chunkd on host %s",
+                      rc, stn->hostname);
+               return -1;
+       }
+
+       stc_free(stc);
+       return 0;
+}
+
 static struct storage_node *_stor_node_by_nid(uint32_t nid)
 {
        struct storage_node *sn;
@@ -416,6 +436,20 @@ struct storage_node *stor_node_by_nid(uint32_t nid)
        return sn;
 }
 
+struct st_node_ops stor_ops_chunk = {
+       .open =         chunk_open,
+       .open_read =    chunk_open_read,
+       .close =        chunk_close,
+       .abort =        chunk_abort,
+       .put_start =    chunk_put_start,
+       .put_buf =      chunk_put_buf,
+       .put_end =      chunk_put_end,
+       .get_buf =      chunk_get_buf,
+       .obj_del =      chunk_obj_del,
+       .obj_test =     chunk_obj_test,
+       .node_check =   chunk_node_check,
+};
+
 static int stor_add_node_addr(struct storage_node *sn,
                              const char *hostname, const char *portstr)
 {
@@ -475,6 +509,7 @@ void stor_add_node(uint32_t nid, const char *hostname, 
const char *portstr,
                }
                memset(sn, 0, sizeof(struct storage_node));
                sn->id = nid;
+               sn->ops = &stor_ops_chunk;
 
                if ((sn->hostname = strdup(hostname)) == NULL) {
                        applog(LOG_WARNING, "No core");
@@ -498,24 +533,6 @@ void stor_add_node(uint32_t nid, const char *hostname, 
const char *portstr,
        g_mutex_unlock(tabled_srv.bigmutex);
 }
 
-/* Return 0 if the node checks out ok */
-int stor_node_check(struct storage_node *stn)
-{
-       struct st_client *stc;
-       int rc;
-
-       rc = stor_new_stc(stn, &stc);
-       if (rc < 0) {
-               applog(LOG_INFO,
-                      "Error %d connecting to chunkd on host %s",
-                      rc, stn->hostname);
-               return -1;
-       }
-
-       stc_free(stc);
-       return 0;
-}
-
 void stor_stats()
 {
        struct storage_node *sn;
diff --git a/server/tabled.h b/server/tabled.h
index 00d8f84..6ab32a3 100644
--- a/server/tabled.h
+++ b/server/tabled.h
@@ -90,17 +90,40 @@ struct geo {
        char                    *rack;
 };
 
+struct storage_node;
+struct open_chunk;
+
+struct st_node_ops {
+       int (*open)(struct open_chunk *cep, struct storage_node *stn,
+                   struct event_base *ev_base);
+       int (*open_read)(struct open_chunk *cep,
+                        void (*cb)(struct open_chunk *),
+                        uint64_t key, uint64_t *psz);
+       void (*close)(struct open_chunk *cep);
+       void (*abort)(struct open_chunk *cep);
+       int (*put_start)(struct open_chunk *cep,
+                        void (*cb)(struct open_chunk *),
+                        uint64_t key, uint64_t size);
+       ssize_t (*put_buf)(struct open_chunk *cep, void *data, size_t len);
+       bool (*put_end)(struct open_chunk *cep);
+       ssize_t (*get_buf)(struct open_chunk *cep, void *data, size_t len);
+       int (*obj_del)(struct storage_node *stn, uint64_t key);
+       bool (*obj_test)(struct open_chunk *cep, uint64_t key);
+       int (*node_check)(struct storage_node *stn);
+};
+
 struct storage_node {
        struct list_head        all_link;
        uint32_t                id;
        bool                    up;
        time_t                  last_up;
+       struct st_node_ops      *ops;
+
+       int ref;                /* number of open_chunk or other */
 
        unsigned                alen;
        struct sockaddr_in6     addr;
        char                    *hostname;
-
-       int ref;                /* number of open_chunk or other */
 };
 
 typedef bool (*cli_evt_func)(struct client *, unsigned int,
@@ -119,6 +142,8 @@ struct open_chunk {
        uint64_t                size;
        uint64_t                done;
 
+       struct st_node_ops      *ops;
+
        /* chunk */
        int                     wfd;
        bool                    w_armed;
@@ -404,25 +429,68 @@ extern void read_config(void);
 /* storage.c */
 extern struct storage_node *stor_node_get(struct storage_node *stn);
 extern void stor_node_put(struct storage_node *stn);
-extern int stor_open(struct open_chunk *cep, struct storage_node *stn,
-                    struct event_base *ev_base);
-extern int stor_open_read(struct open_chunk *cep,
-                         void (*cb)(struct open_chunk *),
-                         uint64_t key, uint64_t *psz);
-extern void stor_close(struct open_chunk *cep);
-extern void stor_abort(struct open_chunk *cep);
-extern int stor_put_start(struct open_chunk *cep,
+static inline int stor_open(struct open_chunk *cep, struct storage_node *stn,
+                           struct event_base *ev_base)
+{
+       cep->ops = stn->ops;
+       return cep->ops->open(cep, stn, ev_base);
+}
+static inline int stor_open_read(struct open_chunk *cep,
+                                void (*cb)(struct open_chunk *),
+                                uint64_t key, uint64_t *psz)
+{
+       return cep->ops->open_read(cep, cb, key, psz);
+}
+/*
+ * The stor_abort and stor_close have an annoying convention of being possibly
+ * called on an unopened open_chunk. We deal with that.
+ */
+static inline void stor_close(struct open_chunk *cep)
+{
+       if (cep->ops)
+               cep->ops->close(cep);
+}
+static inline void stor_abort(struct open_chunk *cep)
+{
+       if (cep->ops)
+               cep->ops->abort(cep);
+}
+static inline int stor_put_start(struct open_chunk *cep,
                          void (*cb)(struct open_chunk *),
-                         uint64_t key, uint64_t size);
-extern ssize_t stor_put_buf(struct open_chunk *cep, void *data, size_t len);
-extern bool stor_put_end(struct open_chunk *cep);
-extern ssize_t stor_get_buf(struct open_chunk *cep, void *data, size_t len);
-extern int stor_obj_del(struct storage_node *stn, uint64_t key);
-extern bool stor_obj_test(struct open_chunk *cep, uint64_t key);
+                         uint64_t key, uint64_t size)
+{
+       return cep->ops->put_start(cep, cb, key, size);
+}
+static inline ssize_t stor_put_buf(struct open_chunk *cep, void *data,
+                                  size_t len)
+{
+       return cep->ops->put_buf(cep, data, len);
+}
+static inline bool stor_put_end(struct open_chunk *cep)
+{
+       return cep->ops->put_end(cep);
+}
+static inline ssize_t stor_get_buf(struct open_chunk *cep, void *data,
+                                  size_t len)
+{
+       return cep->ops->get_buf(cep, data, len);
+}
+static inline int stor_obj_del(struct storage_node *stn, uint64_t key)
+{
+       return stn->ops->obj_del(stn, key);
+}
+static inline bool stor_obj_test(struct open_chunk *cep, uint64_t key)
+{
+       return cep->ops->obj_test(cep, key);
+}
+static inline int stor_node_check(struct storage_node *stn)
+{
+       return stn->ops->node_check(stn);
+}
 extern struct storage_node *stor_node_by_nid(uint32_t nid);
-extern void stor_add_node(uint32_t nid, const char *hostname,
-                         const char *portstr, struct geo *locp);
-extern int stor_node_check(struct storage_node *stn);
+extern void stor_add_node(uint32_t nid,
+                         const char *hostname, const char *portstr,
+                         struct geo *locp);
 extern void stor_stats(void);
 extern bool stor_status(struct client *cli, GList *content);
 
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to