Changes from v1:
- avoid referencing dead struct client (grep for 'invalidate_cli'),
by changing FSM callback prototype.
- insert 'void *priv' member into struct atcp_wr_state, and replace
cb_data1/cb_data2 callback parameters with (struct atcp_wr_state *, void *).
struct client / struct session, or whatever, may be stored in
atcp_wr_state::priv.
- minor API polishing and further abstraction
server/Makefile.am |1
server/atcp.c | 238 +++
server/atcp.h | 100 +++
server/bucket.c|8 -
server/object.c| 56 +--
server/server.c| 268 +
server/status.c|3
server/tabled.h| 46 ++---
8 files changed, 436 insertions(+), 284 deletions(-)
diff --git a/server/Makefile.am b/server/Makefile.am
index 5b53a0a..5e0abd5 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -4,6 +4,7 @@ INCLUDES= -I$(top_srcdir)/include @GLIB_CFLAGS@
@HAIL_CFLAGS@
sbin_PROGRAMS = tabled tdbadm
tabled_SOURCES = tabled.h \
+ atcp.c atcp.h \
bucket.c cldu.c config.c metarep.c object.c replica.c \
server.c status.c storage.c storparse.c util.c
tabled_LDADD = ../lib/libtdb.a \
diff --git a/server/atcp.c b/server/atcp.c
new file mode 100644
index 000..0050a68
--- /dev/null
+++ b/server/atcp.c
@@ -0,0 +1,238 @@
+
+/*
+ * Copyright 2010 Red Hat, Inc.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; see the file COPYING. If not, write to
+ * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
+ *
+ */
+
+#define _GNU_SOURCE
+#include tabled-config.h
+
+#include string.h
+#include stdlib.h
+#include errno.h
+#include sys/uio.h
+#include atcp.h
+
+bool atcp_cb_free(struct atcp_wr_state *wst, void *cb_data, bool done)
+{
+ free(cb_data);
+ return false;
+}
+
+static void atcp_write_complete(struct atcp_write *tmp)
+{
+ struct atcp_wr_state *wst = tmp-wst;
+
+ list_del(tmp-node);
+ list_add_tail(tmp-node, wst-write_compl_q);
+}
+
+static bool atcp_write_free(struct atcp_write *tmp, bool done)
+{
+ struct atcp_wr_state *wst = tmp-wst;
+ bool rcb = false;
+
+ wst-write_cnt -= tmp-length;
+ list_del_init(tmp-node);
+ if (tmp-cb)
+ rcb = tmp-cb(wst, tmp-cb_data, done);
+ free(tmp);
+
+ return rcb;
+}
+
+bool atcp_write_run_compl(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr;
+ bool do_loop;
+
+ do_loop = false;
+ while (!list_empty(wst-write_compl_q)) {
+ wr = list_entry(wst-write_compl_q.next,
+ struct atcp_write, node);
+ do_loop |= atcp_write_free(wr, true);
+ }
+ return do_loop;
+}
+
+void atcp_write_free_all(struct atcp_wr_state *wst)
+{
+ struct atcp_write *wr, *tmp;
+
+ atcp_write_run_compl(wst);
+ list_for_each_entry_safe(wr, tmp, wst-write_q, node) {
+ atcp_write_free(wr, false);
+ }
+}
+
+static bool atcp_writable(struct atcp_wr_state *wst)
+{
+ int n_iov;
+ struct atcp_write *tmp;
+ ssize_t rc;
+ struct iovec iov[ATCP_MAX_WR_IOV];
+
+ /* accumulate pending writes into iovec */
+ n_iov = 0;
+ list_for_each_entry(tmp, wst-write_q, node) {
+ if (n_iov == ATCP_MAX_WR_IOV)
+ break;
+ /* bleh, struct iovec should declare iov_base const */
+ iov[n_iov].iov_base = (void *) tmp-buf;
+ iov[n_iov].iov_len = tmp-togo;
+ n_iov++;
+ }
+
+ /* execute non-blocking write */
+do_write:
+ rc = writev(wst-fd, iov, n_iov);
+ if (rc 0) {
+ if (errno == EINTR)
+ goto do_write;
+ if (errno != EAGAIN)
+ goto err_out;
+ return true;
+ }
+
+ /* iterate through write queue, issuing completions based on
+* amount of data written
+*/
+ while (rc 0) {
+ int sz;
+
+ /* get pointer to first record on list */
+ tmp = list_entry(wst-write_q.next, struct atcp_write, node);
+
+ /* mark data consumed by decreasing tmp-len */
+ sz = (tmp-togo rc) ? tmp-togo : rc;
+ tmp-togo -= sz;
+ tmp-buf +=