Below is my current work-in-progress, finishing the packet / message
logical separation. This will permit messages larger than a single
packet, which, in turn, permits GET and PUT messages to be more easily
handled as a single, contiguous message. DATA messages are eliminated.
This is a major network protocol change.
This work eliminates several problems, most notably (to Pete) the
problem related to PUT message "dropping dup" errors. This change also
simplifies the code nicely, because the client and server code no longer
has to think about low-level details.
rediff'd against 28d97076775b0c4c5fef27621c4579ef70882d6f.
include/cld_msg.h | 36 ++--
include/cldc.h | 30 +--
lib/cldc.c | 372 +++++++++++++++++++++++++------------------------
server/cld.h | 8 -
server/msg.c | 347 ++++-----------------------------------------
server/server.c | 97 ++++++++++--
server/session.c | 120 +++++++++++----
test/it-works.c | 2
test/load-file-event.c | 2
test/lock-file-event.c | 2
test/save-file-event.c | 2
test/start-daemon | 2
12 files changed, 432 insertions(+), 588 deletions(-)
diff --git a/include/cld_msg.h b/include/cld_msg.h
index e1fd9f1..124acbb 100644
--- a/include/cld_msg.h
+++ b/include/cld_msg.h
@@ -36,7 +36,10 @@ enum {
CLD_MAX_USERNAME = 32, /**< includes req. nul */
CLD_MAX_SECRET_KEY = 128, /**< includes req. nul */
- CLD_MAX_DATA_MSGS = 1024, /**< max data msgs in a stream
*/
+ CLD_MAX_PKT_MSG_SZ = 1024,
+ CLD_MAX_PKT_MSG = 128,
+ CLD_MAX_MSG_SZ = CLD_MAX_PKT_MSG * 1024, /**< maximum total
+ msg size, including all packets */
};
/*
@@ -53,7 +56,6 @@ enum cld_msg_ops {
cmo_open = 2, /**< open file */
cmo_get_meta = 3, /**< get metadata */
cmo_get = 4, /**< get metadata + data */
- cmo_data_s = 5, /**< data message to server */
cmo_put = 6, /**< put data */
cmo_close = 7, /**< close file */
cmo_del = 8, /**< delete file */
@@ -67,7 +69,7 @@ enum cld_msg_ops {
cmo_ping = 30, /**< server to client ping */
cmo_not_master = 31, /**< I am not the master! */
cmo_event = 32, /**< server->cli async event */
- cmo_data_c = 33, /**< data message to client */
+ cmo_ack_frag = 33, /**< ack partial msg */
};
/** CLD error codes */
@@ -118,12 +120,19 @@ enum cld_lock_flags {
CLF_SHARED = (1 << 0), /**< a shared (read) lock */
};
+/** CLD packet flags */
+enum cld_packet_flags {
+ CPF_FIRST = (1 << 0), /**< first fragment */
+ CPF_LAST = (1 << 1), /**< last fragment */
+};
+
/** header for each packet */
struct cld_packet {
uint8_t magic[CLD_MAGIC_SZ]; /**< magic number; constant */
uint64_t seqid; /**< sequence id */
uint8_t sid[CLD_SID_SZ]; /**< client id */
- uint8_t res[8];
+ uint32_t flags; /**< CPF_xxx flags */
+ uint8_t res[4];
char user[CLD_MAX_USERNAME]; /**< authenticated user */
};
@@ -144,6 +153,13 @@ struct cld_msg_resp {
uint64_t xid_in; /**< C->S xid */
};
+/** ACK-FRAG message */
+struct cld_msg_ack_frag {
+ struct cld_msg_hdr hdr;
+
+ uint64_t seqid; /**< sequence id to ack */
+};
+
/** OPEN message */
struct cld_msg_open {
struct cld_msg_hdr hdr;
@@ -184,26 +200,14 @@ struct cld_msg_get_resp {
uint64_t time_modify; /**< last modification time */
uint32_t flags; /**< inode flags; CIFL_xxx */
- uint64_t strid; /**< DATA stream id */
-
/* inode name */
};
-/** DATA message */
-struct cld_msg_data {
- struct cld_msg_hdr hdr;
-
- uint64_t strid; /**< stream id */
- uint32_t seg; /**< segment number */
- uint32_t seg_len; /**< segment length */
-};
-
/** PUT message */
struct cld_msg_put {
struct cld_msg_hdr hdr;
uint64_t fh; /**< open file handle */
- uint64_t strid; /**< DATA stream id */
uint32_t data_size; /**< total size of data */
};
diff --git a/include/cldc.h b/include/cldc.h
index b98534e..8bc8295 100644
--- a/include/cldc.h
+++ b/include/cldc.h
@@ -20,27 +20,24 @@ struct cldc_call_opts {
union {
struct {
struct cld_msg_get_resp resp;
- char *buf;
+ const char *buf;
unsigned int size;
char inode_name[CLD_INODE_NAME_MAX];
} get;
} u;
};
-/** internal per-data stream information */
-struct cldc_stream {
- uint64_t strid_le; /**< stream id, LE */
- uint32_t size; /**< total bytes in stream */
- uint32_t next_seg; /**< next segment number expected */
- void *bufp; /**< pointer to next input loc */
- uint32_t size_left; /**< bytes remaining */
- struct cldc_call_opts copts; /**< call options */
- char buf[0]; /**< the raw data stream bytes */
+struct cldc_pkt_info {
+ int pkt_len;
+ int retries;
+
+ /* must be at end of struct */
+ struct cld_packet pkt;
+ uint8_t data[0];
};
/** an outgoing message, from client to server */
struct cldc_msg {
- uint64_t seqid;
uint64_t xid;
struct cldc_session *sess;
@@ -54,12 +51,12 @@ struct cldc_msg {
time_t expire_time;
- int retries;
-
int data_len;
+ int n_pkts;
+
+ struct cldc_pkt_info *pkt_info[CLD_MAX_PKT_MSG];
/* must be at end of struct */
- struct cld_packet pkt;
uint8_t data[0];
};
@@ -102,8 +99,6 @@ struct cldc_session {
GList *out_msg;
time_t msg_scan_time;
- GList *streams;
-
time_t expire_time;
bool expired;
@@ -115,6 +110,9 @@ struct cldc_session {
char secret_key[CLD_MAX_SECRET_KEY];
bool confirmed;
+
+ unsigned int msg_buf_len;
+ char msg_buf[CLD_MAX_MSG_SZ];
};
/** Information for a single CLD server host */
diff --git a/lib/cldc.c b/lib/cldc.c
index aba5795..7cb5952 100644
--- a/lib/cldc.c
+++ b/lib/cldc.c
@@ -101,6 +101,7 @@ static int ack_seqid(struct cldc_session *sess, uint64_t
seqid_le)
memcpy(pkt->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
pkt->seqid = seqid_le;
memcpy(pkt->sid, sess->sid, CLD_SID_SZ);
+ pkt->flags = GUINT32_TO_LE(CPF_FIRST | CPF_LAST);
strncpy(pkt->user, sess->user, CLD_MAX_USERNAME - 1);
resp = (struct cld_msg_hdr *) (pkt + 1);
@@ -132,7 +133,7 @@ static int cldc_rx_generic(struct cldc_session *sess,
while (tmp) {
req = tmp->data;
-#if 0 /* too verbose */
+#if 1 /* too verbose */
if (sess->verbose)
sess->act_log("rx_gen: comparing req->xid (%llu) with
resp->xid_in (%llu)\n",
(unsigned long long)
@@ -164,73 +165,45 @@ static int cldc_rx_generic(struct cldc_session *sess,
return ack_seqid(sess, pkt->seqid);
}
-static int cldc_rx_data_c(struct cldc_session *sess,
- const struct cld_packet *pkt,
- const void *msgbuf,
- size_t buflen)
+static int cldc_rx_ack_frag(struct cldc_session *sess,
+ const struct cld_packet *pkt,
+ const void *msgbuf,
+ size_t buflen)
{
- const struct cld_msg_data *data = msgbuf;
- struct cldc_stream *str = NULL;
- uint32_t seg, seg_len;
+ const struct cld_msg_ack_frag *ack_msg = msgbuf;
+ struct cldc_msg *req = NULL;
GList *tmp;
- const void *p;
- if (buflen < sizeof(*data))
+ if (buflen < sizeof(*ack_msg))
return -1008;
- seg = GUINT32_FROM_LE(data->seg);
- seg_len = GUINT32_FROM_LE(data->seg_len);
-
- if (buflen < (sizeof(*data) + seg_len))
- return -1008;
+ if (sess->verbose)
+ sess->act_log("ack-frag: seqid %llu, want to ack",
+ ack_msg->seqid);
- /* look for stream w/ our strid */
- tmp = sess->streams;
+ tmp = sess->out_msg;
while (tmp) {
- str = tmp->data;
- if (str->strid_le == data->strid)
- break;
+ int i;
+
+ req = tmp->data;
tmp = tmp->next;
- }
- /* if not found, return */
- if (!tmp)
- return -1009;
+ for (i = 0; i < req->n_pkts; i++) {
+ struct cldc_pkt_info *pi;
- /* verify segment number is what we expect */
- if (seg != str->next_seg) {
- if (sess->verbose)
- sess->act_log("rx_data: seg mismatch %u expect %u\n",
- seg, str->next_seg);
- return -1010;
- }
+ pi = req->pkt_info[i];
+ if (!pi)
+ continue;
+ if (pi->pkt.seqid != ack_msg->seqid)
+ continue;
- if (seg_len > str->size_left) {
- if (sess->verbose)
- sess->act_log("rx_data: verflow seg_len %u left %u\n",
- seg_len, str->size_left);
- return -1011;
- }
+ if (sess->verbose)
+ sess->act_log("ack-frag: seqid %llu, expiring",
+ ack_msg->seqid);
- p = data;
- p += sizeof(*data);
- memcpy(str->bufp, p, seg_len);
-
- str->bufp += seg_len;
- str->size_left -= seg_len;
- str->next_seg++;
-
- /* if terminator, process completion */
- if (seg_len == 0 && str->copts.cb) {
- if (str->size_left)
- sess->act_log("rx_data: size %u short by %u\n",
- str->size, str->size_left);
- str->copts.u.get.buf = str->buf;
- str->copts.u.get.size = str->size - str->size_left;
- str->copts.cb(&str->copts, CLE_OK);
- sess->streams = g_list_delete_link(sess->streams, tmp);
- memset(str, 0, sizeof(*str));
- free(str);
+ req->pkt_info[i] = NULL;
+ free(pi);
+ }
}
return 0;
@@ -274,6 +247,19 @@ static int cldc_rx_not_master(struct cldc_session *sess,
return -1055; /* FIXME */
}
+static void cldc_msg_free(struct cldc_msg *msg)
+{
+ int i;
+
+ if (!msg)
+ return;
+
+ for (i = 0; i < CLD_MAX_PKT_MSG; i++)
+ free(msg->pkt_info[i]);
+
+ free(msg);
+}
+
static void sess_expire_outmsg(struct cldc_session *sess, time_t current_time)
{
GList *tmp, *tmp1;
@@ -287,7 +273,7 @@ static void sess_expire_outmsg(struct cldc_session *sess,
time_t current_time)
msg = tmp1->data;
if (current_time > msg->expire_time) {
- free(msg);
+ cldc_msg_free(msg);
sess->out_msg = g_list_delete_link(sess->out_msg, tmp1);
}
}
@@ -363,7 +349,6 @@ static const char *opstr(enum cld_msg_ops op)
case cmo_open: return "cmo_open";
case cmo_get_meta: return "cmo_get_meta";
case cmo_get: return "cmo_get";
- case cmo_data_s: return "cmo_data_s";
case cmo_put: return "cmo_put";
case cmo_close: return "cmo_close";
case cmo_del: return "cmo_del";
@@ -375,17 +360,17 @@ static const char *opstr(enum cld_msg_ops op)
case cmo_ping: return "cmo_ping";
case cmo_not_master: return "cmo_not_master";
case cmo_event: return "cmo_event";
- case cmo_data_c: return "cmo_data_c";
default: return "(unknown)";
}
}
static int cldc_receive_msg(struct cldc_session *sess,
const struct cld_packet *pkt,
- size_t pkt_len,
- const struct cld_msg_hdr *msg,
- size_t msglen)
+ size_t pkt_len)
{
+ const struct cld_msg_hdr *msg = (struct cld_msg_hdr *) sess->msg_buf;
+ size_t msglen = sess->msg_buf_len;
+
switch(msg->op) {
case cmo_nop:
case cmo_close:
@@ -397,16 +382,15 @@ static int cldc_receive_msg(struct cldc_session *sess,
case cmo_new_sess:
case cmo_end_sess:
case cmo_open:
- case cmo_data_s:
case cmo_get_meta:
case cmo_get:
return cldc_rx_generic(sess, pkt, msg, msglen);
case cmo_not_master:
return cldc_rx_not_master(sess, pkt, msg, msglen);
+ case cmo_ack_frag:
+ return cldc_rx_ack_frag(sess, pkt, msg, msglen);
case cmo_event:
return cldc_rx_event(sess, pkt, msg, msglen);
- case cmo_data_c:
- return cldc_rx_data_c(sess, pkt, msg, msglen);
case cmo_ping:
return ack_seqid(sess, pkt->seqid);
case cmo_ack:
@@ -427,11 +411,13 @@ int cldc_receive_pkt(struct cldc_session *sess,
struct timeval tv;
time_t current_time;
uint64_t seqid;
+ uint32_t pkt_flags;
+ bool first_frag, last_frag, have_new_sess, no_seqid;
gettimeofday(&tv, NULL);
current_time = tv.tv_sec;
- if (pkt_len < (sizeof(*pkt) + sizeof(*msg) + SHA_DIGEST_LENGTH)) {
+ if (pkt_len < (sizeof(*pkt) + SHA_DIGEST_LENGTH)) {
if (sess->verbose)
sess->act_log("receive_pkt: msg too short\n");
return -EPROTO;
@@ -447,16 +433,15 @@ int cldc_receive_pkt(struct cldc_session *sess,
(unsigned long long)
GUINT64_FROM_LE(pkt->seqid),
pkt->user,
GUINT32_FROM_LE(dp->size));
- } else if (msg->op == cmo_data_c) {
- struct cld_msg_data *dp;
- dp = (struct cld_msg_data *) msg;
- sess->act_log("receive pkt: len %u, op cmo_data_c"
- ", seqid %llu, user %s, seg %u, len %u\n",
+ } else if (msg->op == cmo_new_sess) {
+ struct cld_msg_resp *dp;
+ dp = (struct cld_msg_resp *) msg;
+ sess->act_log("receive pkt: len %u, op cmo_new_sess"
+ ", seqid %llu, user %s, xid_in %llu\n",
(unsigned int) pkt_len,
(unsigned long long)
GUINT64_FROM_LE(pkt->seqid),
pkt->user,
- GUINT32_FROM_LE(dp->seg),
- GUINT32_FROM_LE(dp->seg_len));
+ (unsigned long long)
GUINT64_FROM_LE(dp->xid_in));
} else {
sess->act_log("receive pkt: len %u, "
"op %s, seqid %llu, user %s\n",
@@ -472,11 +457,6 @@ int cldc_receive_pkt(struct cldc_session *sess,
sess->act_log("receive_pkt: bad pkt magic\n");
return -EPROTO;
}
- if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) {
- if (sess->verbose)
- sess->act_log("receive_pkt: bad msg magic\n");
- return -EPROTO;
- }
/* check HMAC signature */
if (!authcheck(sess, pkt, pkt_len)) {
@@ -497,9 +477,34 @@ int cldc_receive_pkt(struct cldc_session *sess,
if (current_time >= sess->msg_scan_time)
sess_expire_outmsg(sess, current_time);
+ pkt_flags = GUINT32_FROM_LE(pkt->flags);
+ first_frag = pkt_flags & CPF_FIRST;
+ last_frag = pkt_flags & CPF_LAST;
+ have_new_sess = first_frag && (msg->op == cmo_new_sess);
+ no_seqid = first_frag && ((msg->op == cmo_not_master) ||
+ (msg->op == cmo_ack_frag));
+
+ if (first_frag)
+ sess->msg_buf_len = 0;
+
+ if ((sess->msg_buf_len + msglen) > CLD_MAX_MSG_SZ) {
+ if (sess->verbose)
+ sess->act_log("receive_pkt: bad pkt length\n");
+ return -EPROTO;
+ }
+
+ memcpy(sess->msg_buf + sess->msg_buf_len, msg, msglen);
+ sess->msg_buf_len += msglen;
+
+ if (memcmp(msg->magic, CLD_MSG_MAGIC, sizeof(msg->magic))) {
+ if (sess->verbose)
+ sess->act_log("receive_pkt: bad msg magic\n");
+ return -EPROTO;
+ }
+
/* verify (or set, for new-sess) sequence id */
seqid = GUINT64_FROM_LE(pkt->seqid);
- if (msg->op == cmo_new_sess) {
+ if (have_new_sess) {
sess->next_seqid_in = seqid + 1;
sess->next_seqid_in_tr =
sess->next_seqid_in - CLDC_MSG_REMEMBER;
@@ -508,7 +513,7 @@ int cldc_receive_pkt(struct cldc_session *sess,
sess->act_log("receive_pkt: "
"setting next_seqid_in to %llu\n",
(unsigned long long) seqid);
- } else if (msg->op != cmo_not_master) {
+ } else if (!no_seqid) {
if (seqid != sess->next_seqid_in) {
if (seqid_in_range(seqid,
sess->next_seqid_in_tr,
@@ -526,7 +531,10 @@ int cldc_receive_pkt(struct cldc_session *sess,
sess->expire_time = current_time + CLDC_SESS_EXPIRE;
- return cldc_receive_msg(sess, pkt, pkt_len, msg, msglen);
+ if (!last_frag)
+ return 0;
+
+ return cldc_receive_msg(sess, pkt, pkt_len);
}
static void sess_next_seqid(struct cldc_session *sess, uint64_t *seqid)
@@ -543,25 +551,54 @@ static struct cldc_msg *cldc_new_msg(struct cldc_session
*sess,
struct cldc_msg *msg;
struct cld_msg_hdr *hdr;
struct timeval tv;
+ int i, data_left;
+ void *p;
gettimeofday(&tv, NULL);
- msg = calloc(1, sizeof(*msg) + msg_len + SHA_DIGEST_LENGTH);
+ msg = calloc(1, sizeof(*msg) + msg_len);
if (!msg)
return NULL;
+ __cld_rand64(&msg->xid);
+
msg->sess = sess;
+
+ if (copts)
+ memcpy(&msg->copts, copts, sizeof(msg->copts));
+
msg->expire_time = tv.tv_sec + CLDC_MSG_EXPIRE;
- sess_next_seqid(sess, &msg->seqid);
+ msg->data_len = msg_len;
- __cld_rand64(&msg->xid);
+ msg->n_pkts = msg_len / CLD_MAX_PKT_MSG_SZ;
+ msg->n_pkts += ((msg_len % CLD_MAX_PKT_MSG_SZ) ? 1 : 0);
- msg->data_len = msg_len + SHA_DIGEST_LENGTH;
- if (copts)
- memcpy(&msg->copts, copts, sizeof(msg->copts));
+ p = msg->data;
+ data_left = msg_len;
+ for (i = 0; i < msg->n_pkts; i++) {
+ struct cldc_pkt_info *pi;
+ int pkt_len;
+
+ pkt_len = MIN(data_left, CLD_MAX_PKT_MSG_SZ);
+
+ pi = calloc(1, sizeof(*pi) + pkt_len + SHA_DIGEST_LENGTH);
+ if (!pi)
+ goto err_out;
- msg->pkt.seqid = msg->seqid;
+ pi->pkt_len = pkt_len;
+
+ memcpy(pi->pkt.magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
+ memcpy(pi->pkt.sid, sess->sid, CLD_SID_SZ);
+ strncpy(pi->pkt.user, sess->user, CLD_MAX_USERNAME - 1);
+
+ if (i == 0)
+ pi->pkt.flags |= GUINT32_TO_LE(CPF_FIRST);
+ if (i == (msg->n_pkts - 1))
+ pi->pkt.flags |= GUINT32_TO_LE(CPF_LAST);
+
+ msg->pkt_info[i] = pi;
+ }
hdr = (struct cld_msg_hdr *) &msg->data[0];
memcpy(&hdr->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
@@ -569,6 +606,10 @@ static struct cldc_msg *cldc_new_msg(struct cldc_session
*sess,
hdr->xid = msg->xid;
return msg;
+
+err_out:
+ cldc_msg_free(msg);
+ return NULL;
}
static void sess_msg_drop(struct cldc_session *sess)
@@ -583,7 +624,7 @@ static void sess_msg_drop(struct cldc_session *sess)
if (!msg->done && msg->cb)
msg->cb(msg, NULL, 0, false);
- free(msg);
+ cldc_msg_free(msg);
}
g_list_free(sess->out_msg);
@@ -614,17 +655,31 @@ static int sess_timer(struct cldc_session *sess, void
*priv)
}
while (tmp) {
+ int i;
+
msg = tmp->data;
tmp = tmp->next;
if (msg->done)
continue;
- msg->retries++;
- sess->ops->pkt_send(sess->private,
- sess->addr, sess->addr_len,
- &msg->pkt,
- sizeof(msg->pkt) + msg->data_len);
+ for (i = 0; i < msg->n_pkts; i++) {
+ struct cldc_pkt_info *pi;
+ int total_pkt_len;
+
+ pi = msg->pkt_info[i];
+ if (!pi)
+ continue;
+
+ total_pkt_len = sizeof(struct cld_packet) +
+ pi->pkt_len + SHA_DIGEST_LENGTH;
+
+ pi->retries++;
+
+ sess->ops->pkt_send(sess->private,
+ sess->addr, sess->addr_len,
+ &pi->pkt, total_pkt_len);
+ }
}
sess->ops->timer_ctl(sess->private, true, sess_timer, sess,
@@ -635,46 +690,38 @@ static int sess_timer(struct cldc_session *sess, void
*priv)
static int sess_send(struct cldc_session *sess,
struct cldc_msg *msg)
{
- struct cld_packet *pkt = &msg->pkt;
+ int i, data_left;
+ void *p;
- memcpy(pkt->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
- memcpy(pkt->sid, sess->sid, CLD_SID_SZ);
- strncpy(pkt->user, sess->user, CLD_MAX_USERNAME - 1);
+ p = msg->data;
+ data_left = msg->data_len;
+ for (i = 0; i < msg->n_pkts; i++) {
+ struct cldc_pkt_info *pi;
+ int total_pkt_len;
- /* sign message */
- if (!authsign(sess, pkt, sizeof(*pkt) + msg->data_len))
- return -1;
+ pi = msg->pkt_info[i];
+ memcpy(pi->data, p, pi->pkt_len);
- /* add to list of outgoing packets, waiting to be ack'd */
- sess->out_msg = g_list_append(sess->out_msg, msg);
+ total_pkt_len = sizeof(struct cld_packet) +
+ pi->pkt_len + SHA_DIGEST_LENGTH;
- /* attempt first send */
- if (sess->ops->pkt_send(sess->private,
- sess->addr, sess->addr_len,
- pkt, sizeof(*pkt) + msg->data_len) < 0)
- return -1;
+ sess_next_seqid(sess, &pi->pkt.seqid);
- return 0;
-}
-
-static int sess_stream_open(struct cldc_session *sess,
- uint64_t strid_le,
- uint32_t size,
- const struct cldc_call_opts *copts)
-{
- struct cldc_stream *str;
+ p += pi->pkt_len;
+ data_left -= pi->pkt_len;
- str = calloc(1, sizeof(*str) + size);
- if (!str)
- return -ENOMEM;
+ if (!authsign(sess, &pi->pkt, total_pkt_len))
+ return -1;
- str->strid_le = strid_le;
- str->size = size;
- str->size_left = size;
- str->bufp = &str->buf[0];
- memcpy(&str->copts, copts, sizeof(*copts));
+ /* attempt first send */
+ if (sess->ops->pkt_send(sess->private,
+ sess->addr, sess->addr_len,
+ &pi->pkt, total_pkt_len) < 0)
+ return -1;
+ }
- sess->streams = g_list_append(sess->streams, str);
+ /* add to list of outgoing packets, waiting to be ack'd */
+ sess->out_msg = g_list_prepend(sess->out_msg, msg);
return 0;
}
@@ -691,18 +738,11 @@ static void sess_free(struct cldc_session *sess)
tmp = sess->out_msg;
while (tmp) {
- free(tmp->data);
+ cldc_msg_free(tmp->data);
tmp = tmp->next;
}
g_list_free(sess->out_msg);
- tmp = sess->streams;
- while (tmp) {
- free(tmp->data);
- tmp = tmp->next;
- }
- g_list_free(sess->streams);
-
memset(sess, 0, sizeof(*sess));
free(sess);
}
@@ -1082,10 +1122,7 @@ int cldc_put(struct cldc_fh *fh, const struct
cldc_call_opts *copts,
struct cldc_session *sess;
struct cldc_msg *msg;
struct cld_msg_put *put;
- struct cldc_msg *datamsg[CLDC_MAX_DATA_PKTS];
- int n_pkts, i, copy_len;
- const void *p;
- size_t data_len_left = data_len;
+ int n_pkts;
if (!data || !data_len || data_len > CLDC_MAX_DATA_SZ)
return -EINVAL;
@@ -1101,58 +1138,20 @@ int cldc_put(struct cldc_fh *fh, const struct
cldc_call_opts *copts,
sess = fh->sess;
/* create PUT message */
- msg = cldc_new_msg(sess, copts, cmo_put, sizeof(struct cld_msg_put));
+ msg = cldc_new_msg(sess, copts, cmo_put,
+ sizeof(struct cld_msg_put) + data_len);
if (!msg)
return -ENOMEM;
put = (struct cld_msg_put *) msg->data;
put->fh = fh->fh_le;
- __cld_rand64(&put->strid);
put->data_size = GUINT32_TO_LE(data_len);
- memset(datamsg, 0, sizeof(datamsg));
-
- p = data;
- for (i = 0; i < n_pkts; i++) {
- struct cld_msg_data *dm;
- void *q;
-
- /* create DATA message for this segment */
- datamsg[i] = cldc_new_msg(sess, copts, cmo_data_s,
- CLDC_MAX_DATA_PKT_SZ);
- if (!datamsg[i])
- goto err_out;
-
- if (i == (n_pkts - 1))
- datamsg[i]->cb = generic_end_cb;
-
- dm = (struct cld_msg_data *) datamsg[i]->data;
- q = dm;
- q += sizeof(struct cld_msg_data);
-
- copy_len = MIN(CLDC_MAX_DATA_PKT_SZ, data_len_left);
- memcpy(q, p, copy_len);
-
- p += copy_len;
- data_len_left -= copy_len;
-
- dm->strid = put->strid;
- dm->seg = GUINT32_TO_LE(i);
- dm->seg_len = GUINT32_TO_LE(copy_len);
- }
+ memcpy((put + 1), data, data_len);
sess_send(sess, msg);
- for (i = 0; i < n_pkts; i++)
- sess_send(sess, datamsg[i]);
return 0;
-
-err_out:
- for (i = 0; i < n_pkts; i++)
- if (datamsg[i])
- free(datamsg[i]);
- free(msg);
- return -ENOMEM;
}
#undef XC32
@@ -1175,8 +1174,11 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const
void *resp_p,
resp_rc = GUINT32_FROM_LE(resp->resp.code);
if (resp_rc == CLE_OK) {
+ bool get_body;
+
o = &msg->copts.u.get.resp;
+ get_body = (resp->resp.hdr.op == cmo_get);
msg->copts.op = cmo_get;
/* copy-and-swap */
@@ -1190,10 +1192,19 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const
void *resp_p,
/* copy inode name */
if (o->ino_len <= sizeof(CLD_INODE_NAME_MAX)) {
+ size_t diffsz;
const void *p;
+
p = resp;
p += sizeof(struct cld_msg_get_resp);
memcpy(&msg->copts.u.get.inode_name, p, o->ino_len);
+
+ p += o->ino_len;
+ diffsz = p - resp_p;
+
+ /* point to internal buffer holding GET data */
+ msg->copts.u.get.buf = msg->sess->msg_buf + diffsz;
+ msg->copts.u.get.size = msg->sess->msg_buf_len - diffsz;
} else {
o->ino_len = 0; /* Probably full of garbage */
}
@@ -1206,9 +1217,6 @@ static ssize_t get_end_cb(struct cldc_msg *msg, const
void *resp_p,
return 0;
}
- sess_stream_open(msg->sess, resp->strid, o->size,
- &msg->copts);
-
return 0;
}
#undef XC
diff --git a/server/cld.h b/server/cld.h
index ed2ddc8..505b02a 100644
--- a/server/cld.h
+++ b/server/cld.h
@@ -70,9 +70,6 @@ struct session {
uint64_t next_seqid_in;
uint64_t next_seqid_out;
- GList *put_q; /* queued PUT pkts */
- GList *data_q; /* queued data pkts */
-
GList *out_q; /* outgoing pkts (to client) */
struct timer retry_timer;
@@ -80,6 +77,10 @@ struct session {
bool ping_open; /* sent PING, waiting for ack */
bool dead; /* session has ended */
+
+ /* huge buffer should always come last */
+ unsigned int msg_buf_len;
+ char msg_buf[CLD_MAX_MSG_SZ];
};
struct msg_params {
@@ -134,7 +135,6 @@ struct server {
extern int inode_lock_rescan(DB_TXN *txn, cldino_t inum);
extern void msg_open(struct msg_params *);
extern void msg_put(struct msg_params *);
-extern void msg_data(struct msg_params *);
extern void msg_close(struct msg_params *);
extern void msg_del(struct msg_params *);
extern void msg_unlock(struct msg_params *);
diff --git a/server/msg.c b/server/msg.c
index 4f0e583..fcb8927 100644
--- a/server/msg.c
+++ b/server/msg.c
@@ -447,7 +447,8 @@ void msg_get(struct msg_params *mp, bool metadata_only)
name_len = GUINT32_FROM_LE(inode->ino_len);
- resp_len = sizeof(*resp) + name_len + SHA_DIGEST_LENGTH;
+ resp_len = sizeof(*resp) + name_len + SHA_DIGEST_LENGTH +
+ (metadata_only ? 0 : GUINT32_TO_LE(inode->size));
resp = alloca(resp_len);
if (!resp) {
resp_rc = CLE_OOM;
@@ -463,17 +464,11 @@ void msg_get(struct msg_params *mp, bool metadata_only)
resp->time_create = inode->time_create;
resp->time_modify = inode->time_modify;
resp->flags = inode->flags;
- resp->strid = rand_strid;
memcpy(resp+1, inode+1, name_len);
- sess_sendmsg(sess, resp, resp_len, NULL, NULL);
-
- /* send one or more data packets, if necessary */
+ /* send data, if requested */
if (!metadata_only) {
- int i, seg_len;
void *p;
- char dbuf[CLD_MAX_UDP_SEG];
- struct cld_msg_data *dr = (struct cld_msg_data *) &dbuf;
rc = cldb_data_get(txn, inum, &data_mem, &data_mem_len,
false, false);
@@ -484,41 +479,18 @@ void msg_get(struct msg_params *mp, bool metadata_only)
if (rc == DB_NOTFOUND) {
data_mem = NULL;
data_mem_len = 0;
- } else if (rc) {
+ } else if (rc || (data_mem_len != GUINT32_TO_LE(inode->size))) {
resp_rc = CLE_DB_ERR;
goto err_out;
}
- /* copy the GET msg's hdr, then change op to DATA */
- memset(dr, 0, sizeof(*dr));
- dr->hdr.op = cmo_data_c;
-
- i = 0;
- p = data_mem;
-
- /* break up data_mem into individual packets */
- while (data_mem_len > 0) {
- seg_len = MIN(CLD_MAX_UDP_SEG - sizeof(*dr),
data_mem_len);
-
- dr->strid = rand_strid;
- dr->seg = GUINT32_TO_LE(i);
- dr->seg_len = GUINT32_TO_LE(seg_len);
- memcpy(dbuf + sizeof(*dr), p, seg_len);
-
- i++;
- p += seg_len;
- data_mem_len -= seg_len;
-
- sess_sendmsg(sess, dr, seg_len + sizeof(*dr), NULL,
NULL);
- }
-
- /* send terminating packet (seg_len == 0) */
- dr->strid = rand_strid;
- dr->seg = GUINT32_TO_LE(i);
- dr->seg_len = 0;
- sess_sendmsg(sess, dr, sizeof(*dr), NULL, NULL);
+ p = (resp + 1);
+ p += name_len;
+ memcpy(p, data_mem, data_mem_len);
}
+ sess_sendmsg(sess, resp, resp_len, NULL, NULL);
+
rc = txn->commit(txn, 0);
if (rc)
dbenv->err(dbenv, rc, "msg_get read-only txn commit");
@@ -752,122 +724,33 @@ err_out_noabort:
free(raw_sess);
}
-static void try_commit_data(struct msg_params *mp,
- uint64_t strid, GList *pmsg_ent)
+void msg_put(struct msg_params *mp)
{
- struct cld_msg_put *pmsg = pmsg_ent->data;
- struct cld_msg_data *dmsg;
- GList *tmp, *tmp1;
- uint32_t data_size, tmp_size, tmp_seg = 0;
- int last_seg, nseg, rc, i;
+ const struct cld_msg_put *msg = mp->msg;
+ struct session *sess = mp->sess;
+ uint64_t fh;
struct raw_handle *h = NULL;
struct raw_inode *inode = NULL;
- cldino_t inum;
- uint64_t fh;
enum cle_err_codes resp_rc = CLE_OK;
- void *mem, *p, *q;
- struct cld_msg_data **darr;
- struct session *sess = mp->sess;
- bool have_end_seg = false;
+ const void *mem;
+ int rc;
+ cldino_t inum;
+ uint32_t omode, data_size;
DB_ENV *dbenv = cld_srv.cldb.env;
DB_TXN *txn;
- data_size = GUINT32_FROM_LE(pmsg->data_size);
- tmp_size = 0;
- last_seg = 0;
- nseg = 0;
-
- /*
- * Pass 1: count total size of all packets in our stream;
- * count number of segments in stream.
- */
- tmp = sess->data_q;
- while (tmp) {
- uint32_t tmp_seg_len;
-
- dmsg = tmp->data;
- tmp = tmp->next;
-
- /* non-matching strid[] implies not-our-stream */
- if (dmsg->strid != strid)
- continue;
-
- tmp_seg = GUINT32_FROM_LE(dmsg->seg);
- if (tmp_seg >= CLD_MAX_DATA_MSGS)
- break;
- if (tmp_seg > last_seg)
- last_seg = tmp_seg;
-
- tmp_seg_len = GUINT32_FROM_LE(dmsg->seg_len);
- if (tmp_seg_len == 0)
- have_end_seg = true;
- else
- tmp_size += tmp_seg_len;
- nseg++;
- if (nseg > CLD_MAX_DATA_MSGS)
- break;
- }
-
- if (debugging)
- cldlog(LOG_DEBUG,
- " data scan: end %d nseg %u last %u len %u/%u",
- have_end_seg, nseg, last_seg, tmp_size, data_size);
-
- /* return if data stream not yet 100% received */
- if (!have_end_seg || tmp_size < data_size)
- return; /* nothing to do */
+ /* make sure input data as large as message header */
+ if (mp->msg_len < sizeof(*msg))
+ return;
- /* stream parameter bounds checking */
- if ((tmp_seg >= CLD_MAX_DATA_MSGS) ||
- (nseg > CLD_MAX_DATA_MSGS) ||
- (tmp_size > data_size)) {
- resp_rc = CLE_DATA_INVAL;
+ /* make sure additional input data as large as expected */
+ data_size = GUINT32_FROM_LE(msg->data_size);
+ if (mp->msg_len != (data_size + sizeof(*msg))) {
+ resp_rc = CLE_BAD_PKT;
goto err_out_noabort;
}
- /* create array to store pointers to each data packet */
- darr = alloca(nseg * sizeof(struct cld_msg_data *));
- memset(darr, 0, nseg * sizeof(struct cld_msg_data *));
-
- sess->put_q = g_list_delete_link(sess->put_q, pmsg_ent);
-
- /*
- * Pass 2: store packets in array, sorted by segment number
- */
- tmp = sess->data_q;
- while (tmp) {
- dmsg = tmp->data;
- tmp1 = tmp;
- tmp = tmp->next;
-
- /* non-matching strid[] implies not-our-stream */
- if (dmsg->strid != strid)
- continue;
-
- /* remove data packet from data msg queue */
- sess->data_q = g_list_delete_link(sess->data_q, tmp1);
-
- tmp_seg = GUINT32_FROM_LE(dmsg->seg);
-
- /* prevent duplicate segment numbers */
- if (darr[tmp_seg]) {
- resp_rc = CLE_DATA_INVAL;
- goto err_out_noabort;
- }
- darr[tmp_seg] = dmsg;
- }
-
- /* final check for missing segments; if segments are missing
- * at this point, it is a corrupted/malicious data stream,
- * because it passed other checks following Pass #1
- */
- for (i = 0; i < nseg; i++)
- if (!darr[i]) {
- resp_rc = CLE_DATA_INVAL;
- goto err_out_noabort;
- }
-
- fh = GUINT64_FROM_LE(pmsg->fh);
+ fh = GUINT64_FROM_LE(msg->fh);
rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
if (rc) {
@@ -884,6 +767,13 @@ static void try_commit_data(struct msg_params *mp,
}
inum = cldino_from_le(h->inum);
+ omode = GUINT32_FROM_LE(h->mode);
+
+ if ((!(omode & COM_WRITE)) ||
+ (omode & COM_DIRECTORY)) {
+ resp_rc = CLE_MODE_INVAL;
+ goto err_out;
+ }
/* read inode from db */
rc = cldb_inode_get(txn, inum, &inode, false, DB_RMW);
@@ -892,26 +782,8 @@ static void try_commit_data(struct msg_params *mp,
goto err_out;
}
- /* create contig. memory area sized to contain entire data stream */
- p = mem = malloc(data_size);
- if (!mem) {
- resp_rc = CLE_OOM;
- goto err_out;
- }
-
- /* loop through array, copying each data packet into contig. area */
- for (i = 0; i <= last_seg; i++) {
- dmsg = darr[i];
- q = dmsg;
-
- tmp_size = GUINT32_FROM_LE(dmsg->seg_len);
- memcpy(p, q + sizeof(*dmsg), tmp_size);
- p += tmp_size;
-
- free(dmsg);
- }
-
/* store contig. data area in db */
+ mem = (msg + 1);
rc = cldb_data_put(txn, inum, mem, data_size, 0);
if (rc) {
resp_rc = CLE_DB_ERR;
@@ -934,159 +806,6 @@ static void try_commit_data(struct msg_params *mp,
goto err_out_noabort;
}
- resp_ok(sess, mp->msg);
- free(pmsg);
- free(h);
- free(inode);
- return;
-
-err_out:
- rc = txn->abort(txn);
- if (rc)
- dbenv->err(dbenv, rc, "commit txn abort");
-err_out_noabort:
- resp_err(sess, mp->msg, resp_rc);
- free(pmsg);
- free(h);
- free(inode);
-}
-
-void msg_data(struct msg_params *mp)
-{
- const struct cld_msg_data *msg = mp->msg;
- struct session *sess = mp->sess;
- GList *tmp;
- void *mem = NULL;
- enum cle_err_codes resp_rc = CLE_OK;
- uint32_t seg_len;
-
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- seg_len = GUINT32_FROM_LE(msg->seg_len);
-
- if (mp->msg_len < (sizeof(*msg) + seg_len))
- return;
-
- if (debugging)
- cldlog(LOG_DEBUG, " data strid %016llx",
- (unsigned long long) msg->strid);
-
- /* search for PUT message with strid == our strid; that is how we
- * associate DATA messages with the initial PUT msg
- */
- tmp = sess->put_q;
- while (tmp) {
- struct cld_msg_put *pmsg;
-
- pmsg = tmp->data;
- if (pmsg->strid == msg->strid)
- break;
-
- tmp = tmp->next;
- }
-
- if (!tmp) {
- resp_rc = CLE_DATA_INVAL;
- goto err_out;
- }
-
- /* copy DATA msg */
- mem = malloc(mp->msg_len);
- if (!mem) {
- resp_rc = CLE_OOM;
- goto err_out;
- }
-
- memcpy(mem, mp->msg, mp->msg_len);
-
- /* store DATA message on DATA msg queue */
- sess->data_q = g_list_append(sess->data_q, mem);
-
- resp_ok(sess, mp->msg);
-
- /* scan DATA queue for completed stream; commit to db, if found */
- try_commit_data(mp, msg->strid, tmp);
- return;
-
-err_out:
- resp_err(sess, mp->msg, resp_rc);
-}
-
-void msg_put(struct msg_params *mp)
-{
- const struct cld_msg_put *msg = mp->msg;
- struct session *sess = mp->sess;
- uint64_t fh;
- struct raw_handle *h = NULL;
- struct raw_inode *inode = NULL;
- enum cle_err_codes resp_rc = CLE_OK;
- void *mem;
- int rc;
- cldino_t inum;
- uint32_t omode;
- DB_ENV *dbenv = cld_srv.cldb.env;
- DB_TXN *txn;
-
- /* make sure input data as large as expected */
- if (mp->msg_len < sizeof(*msg))
- return;
-
- fh = GUINT64_FROM_LE(msg->fh);
-
- rc = dbenv->txn_begin(dbenv, NULL, &txn, 0);
- if (rc) {
- dbenv->err(dbenv, rc, "DB_ENV->txn_begin");
- resp_rc = CLE_DB_ERR;
- goto err_out_noabort;
- }
-
- /* read handle from db, for validation */
- rc = cldb_handle_get(txn, sess->sid, fh, &h, 0);
- if (rc) {
- resp_rc = CLE_FH_INVAL;
- goto err_out;
- }
-
- inum = cldino_from_le(h->inum);
- omode = GUINT32_FROM_LE(h->mode);
-
- if ((!(omode & COM_WRITE)) ||
- (omode & COM_DIRECTORY)) {
- resp_rc = CLE_MODE_INVAL;
- goto err_out;
- }
-
- /* read inode from db, for validation */
- rc = cldb_inode_get(txn, inum, &inode, false, 0);
- if (rc) {
- resp_rc = CLE_INODE_INVAL;
- goto err_out;
- }
-
- rc = txn->commit(txn, 0);
- if (rc)
- dbenv->err(dbenv, rc, "msg_put read-only txn commit");
-
- /* copy message */
- mem = malloc(mp->msg_len);
- if (!mem) {
- resp_rc = CLE_OOM;
- goto err_out;
- }
-
- memcpy(mem, msg, mp->msg_len);
-
- /* store PUT message in PUT msg queue */
- sess->put_q = g_list_append(sess->put_q, mem);
-
- /*
- * In all other cases we ack here, but in put we do it in
- * try_to_commit_data, so that we can report the result of commit.
- */
- // resp_ok(sess, mp->msg);
-
free(h);
free(inode);
return;
diff --git a/server/server.c b/server/server.c
index c19156b..4595c80 100644
--- a/server/server.c
+++ b/server/server.c
@@ -220,7 +220,6 @@ const char *opstr(enum cld_msg_ops op)
case cmo_open: return "cmo_open";
case cmo_get_meta: return "cmo_get_meta";
case cmo_get: return "cmo_get";
- case cmo_data_s: return "cmo_data_s";
case cmo_put: return "cmo_put";
case cmo_close: return "cmo_close";
case cmo_del: return "cmo_del";
@@ -232,7 +231,6 @@ const char *opstr(enum cld_msg_ops op)
case cmo_ping: return "cmo_ping";
case cmo_not_master: return "cmo_not_master";
case cmo_event: return "cmo_event";
- case cmo_data_c: return "cmo_data_c";
default: return "(unknown)";
}
}
@@ -253,7 +251,6 @@ static void udp_rx_msg(const struct client *cli, const
struct cld_packet *pkt,
case cmo_get: msg_get(mp, false); break;
case cmo_get_meta: msg_get(mp, true); break;
case cmo_put: msg_put(mp); break;
- case cmo_data_s: msg_data(mp); break;
case cmo_close: msg_close(mp); break;
case cmo_del: msg_del(mp); break;
case cmo_unlock: msg_unlock(mp); break;
@@ -267,6 +264,40 @@ static void udp_rx_msg(const struct client *cli, const
struct cld_packet *pkt,
}
}
+static void pkt_ack_frag(struct server_socket *sock,
+ const struct client *cli,
+ const struct cld_packet *pkt)
+{
+ size_t alloc_len;
+ struct cld_packet *outpkt;
+ struct cld_msg_ack_frag *ack_msg;
+
+ alloc_len = sizeof(*outpkt) + sizeof(*ack_msg) + SHA_DIGEST_LENGTH;
+ outpkt = alloca(alloc_len);
+ ack_msg = (struct cld_msg_ack_frag *) (outpkt + 1);
+ memset(outpkt, 0, alloc_len);
+
+ pkt_init_pkt(outpkt, pkt);
+
+ memcpy(ack_msg->hdr.magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
+ __cld_rand64(&ack_msg->hdr.xid);
+ ack_msg->hdr.op = cmo_ack_frag;
+ ack_msg->seqid = pkt->seqid;
+
+ authsign(outpkt, alloc_len);
+
+ if (debugging)
+ cldlog(LOG_DEBUG, "ack-partial-msg: "
+ "sid " SIDFMT ", op %s, seqid %llu",
+ SIDARG(outpkt->sid),
+ opstr(ack_msg->hdr.op),
+ (unsigned long long) GUINT64_FROM_LE(outpkt->seqid));
+
+ /* transmit ack-partial-msg response (once, without retries) */
+ udp_tx(sock, (struct sockaddr *) &cli->addr, cli->addr_len,
+ outpkt, alloc_len);
+}
+
static void udp_rx(struct server_socket *sock,
const struct client *cli,
const void *raw_pkt, size_t pkt_len)
@@ -279,6 +310,8 @@ static void udp_rx(struct server_socket *sock,
struct cld_msg_resp *resp;
struct msg_params mp;
size_t alloc_len;
+ uint32_t pkt_flags;
+ bool first_frag, have_new_sess, have_ack, have_put;
/* drop all completely corrupted packets */
if ((pkt_len < (sizeof(*pkt) + sizeof(*msg) + SHA_DIGEST_LENGTH)) ||
@@ -292,6 +325,12 @@ static void udp_rx(struct server_socket *sock,
goto err_out;
}
+ pkt_flags = GUINT32_FROM_LE(pkt->flags);
+ first_frag = pkt_flags & CPF_FIRST;
+ have_new_sess = first_frag && (msg->op == cmo_new_sess);
+ have_ack = first_frag && (msg->op == cmo_ack);
+ have_put = first_frag && (msg->op == cmo_put);
+
/* look up client session, verify it matches IP and username */
sess = g_hash_table_lookup(cld_srv.sessions, pkt->sid);
if (sess &&
@@ -311,28 +350,26 @@ static void udp_rx(struct server_socket *sock,
mp.msg_len = pkt_len - sizeof(*pkt);
if (debugging) {
- if (msg->op == cmo_data_s) {
- struct cld_msg_data *dp = (struct cld_msg_data *) msg;
- cldlog(LOG_DEBUG,
- " msg op %s, seqid %llu, seg %u, len %u",
- opstr(msg->op),
- (unsigned long long) GUINT64_FROM_LE(pkt->seqid),
- GUINT32_FROM_LE(dp->seg),
- GUINT32_FROM_LE(dp->seg_len));
- } else if (msg->op == cmo_put) {
+ if (have_put) {
struct cld_msg_put *dp = (struct cld_msg_put *) msg;
- cldlog(LOG_DEBUG, " msg op %s, seqid %llu, size %u",
+ cldlog(LOG_DEBUG, " msg op %s, seqid %llu, xid %llu,
size %u",
opstr(msg->op),
(unsigned long long) GUINT64_FROM_LE(pkt->seqid),
+ (unsigned long long) GUINT64_FROM_LE(msg->xid),
GUINT32_FROM_LE(dp->data_size));
- } else {
- cldlog(LOG_DEBUG, " msg op %s, seqid %llu",
+ } else if (first_frag) {
+ cldlog(LOG_DEBUG, " msg op %s, seqid %llu, xid %llu",
opstr(msg->op),
+ (unsigned long long) GUINT64_FROM_LE(pkt->seqid),
+ (unsigned long long) GUINT64_FROM_LE(msg->xid));
+ } else {
+ cldlog(LOG_DEBUG, " seqid %llu",
(unsigned long long)
GUINT64_FROM_LE(pkt->seqid));
}
}
- if (msg->op != cmo_new_sess) {
+ /* advance sequence id's and update last-contact timestamp */
+ if (!have_new_sess) {
if (!sess) {
resp_rc = CLE_SESS_INVAL;
goto err_out;
@@ -341,7 +378,7 @@ static void udp_rx(struct server_socket *sock,
sess->last_contact = current_time.tv_sec;
sess->sock = sock; /* FIXME refcount for changed sockets */
- if (msg->op != cmo_ack) {
+ if (!have_ack) {
/* eliminate duplicates; do not return any response */
if (GUINT64_FROM_LE(pkt->seqid) != sess->next_seqid_in)
{
if (debugging)
@@ -366,6 +403,32 @@ static void udp_rx(struct server_socket *sock,
}
}
+ /* copy message fragment into reassembly buffer */
+ if (sess) {
+ if (pkt_flags & CPF_FIRST)
+ sess->msg_buf_len = 0;
+
+ if ((sess->msg_buf_len + mp.msg_len) > CLD_MAX_MSG_SZ) {
+ resp_rc = CLE_BAD_PKT;
+ goto err_out;
+ }
+
+ memcpy(&sess->msg_buf[sess->msg_buf_len], msg, mp.msg_len);
+ sess->msg_buf_len += mp.msg_len;
+
+ if (!(pkt_flags & CPF_LAST)) {
+ pkt_ack_frag(sock, cli, pkt);
+ return;
+ }
+
+ mp.msg = msg = (struct cld_msg_hdr *) sess->msg_buf;
+ mp.msg_len = sess->msg_buf_len;
+
+ if (debugging)
+ cldlog(LOG_DEBUG, " final message size %u",
+ sess->msg_buf_len);
+ }
+
udp_rx_msg(cli, pkt, msg, &mp);
return;
diff --git a/server/session.c b/server/session.c
index 5096b25..b08425b 100644
--- a/server/session.c
+++ b/server/session.c
@@ -67,6 +67,7 @@ void pkt_init_pkt(struct cld_packet *dest, const struct
cld_packet *src)
memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
dest->seqid = GUINT64_TO_LE(0xdeadbeef);
memcpy(dest->sid, src->sid, CLD_SID_SZ);
+ dest->flags = GUINT32_TO_LE(CPF_FIRST | CPF_LAST);
strncpy(dest->user, src->user, CLD_MAX_USERNAME - 1);
}
@@ -76,6 +77,7 @@ static void pkt_init_sess(struct cld_packet *dest, struct
session *sess)
memcpy(dest->magic, CLD_PKT_MAGIC, CLD_MAGIC_SZ);
dest->seqid = next_seqid_le(&sess->next_seqid_out);
memcpy(dest->sid, sess->sid, CLD_SID_SZ);
+ dest->flags = 0;
strncpy(dest->user, sess->user, CLD_MAX_USERNAME - 1);
}
@@ -128,6 +130,8 @@ static void session_free(struct session *sess)
static void session_trash(struct session *sess)
{
+ cldlog(LOG_INFO, "session " SIDFMT " sent to garbage",
+ SIDARG(sess->sid));
sess->dead = true;
}
@@ -562,14 +566,29 @@ static void session_retry(struct timer *timer)
timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
}
+static void session_outq(struct session *sess, GList *new_pkts)
+{
+ /* if out_q empty, start retry timer */
+ if (!sess->out_q)
+ timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
+
+ sess->out_q = g_list_concat(sess->out_q, new_pkts);
+}
+
bool sess_sendmsg(struct session *sess, const void *msg_, size_t msglen,
void (*done_cb)(struct session_outpkt *),
void *done_data)
{
struct cld_packet *outpkt;
- struct cld_msg_hdr *msg;
- struct session_outpkt *op;
- size_t pkt_len;
+ unsigned int n_pkts, i;
+ size_t pkt_len, msg_left = msglen;
+ struct session_outpkt *pkts[CLD_MAX_PKT_MSG], *op;
+ GList *tmp_root = NULL;
+ const void *p;
+ bool first_frag = true;
+
+ n_pkts = (msglen / CLD_MAX_PKT_MSG_SZ);
+ n_pkts += (msglen % CLD_MAX_PKT_MSG_SZ) ? 1 : 0;
if (debugging) {
const struct cld_msg_hdr *hdr = msg_;
@@ -587,16 +606,18 @@ bool sess_sendmsg(struct session *sess, const void *msg_,
size_t msglen,
case cmo_new_sess:
case cmo_end_sess:
case cmo_open:
- case cmo_data_s:
case cmo_get_meta:
case cmo_get:
rsp = (struct cld_msg_resp *) msg_;
cldlog(LOG_DEBUG, "sendmsg: "
- "sid " SIDFMT ", op %s, msglen %u, code %u",
+ "sid " SIDFMT ", op %s, msglen %u, code %u, "
+ "xid %llu, xid_in %llu",
SIDARG(sess->sid),
opstr(hdr->op),
(unsigned int) msglen,
- GUINT32_FROM_LE(rsp->code));
+ GUINT32_FROM_LE(rsp->code),
+ (unsigned long long) GUINT64_FROM_LE(hdr->xid),
+ (unsigned long long)
GUINT64_FROM_LE(rsp->xid_in));
break;
default:
cldlog(LOG_DEBUG, "sendmsg: "
@@ -607,48 +628,79 @@ bool sess_sendmsg(struct session *sess, const void *msg_,
size_t msglen,
}
}
- op = op_alloc(sizeof(*outpkt) + msglen + SHA_DIGEST_LENGTH);
- if (!op)
+ if (n_pkts > CLD_MAX_PKT_MSG)
return false;
- op->sess = sess;
- op->done_cb = done_cb;
- op->done_data = done_data;
+ /* pass 1: perform allocations */
+ for (i = 0; i < n_pkts; i++) {
+ pkts[i] = op = op_alloc(sizeof(*outpkt) +
+ CLD_MAX_PKT_MSG_SZ +
+ SHA_DIGEST_LENGTH);
+ if (!op)
+ goto err_out;
- outpkt = op->pkt;
- pkt_len = op->pkt_len;
+ tmp_root = g_list_append(tmp_root, op);
+ }
- msg = (struct cld_msg_hdr *) (outpkt + 1);
+ /* pass 2: fill packets */
+ p = msg_;
+ for (i = 0; i < n_pkts; i++) {
+ struct cld_msg_hdr *outmsg;
+ void *outmsg_mem;
+ size_t copy_len;
- /* init packet header */
- pkt_init_sess(outpkt, sess);
+ op = pkts[i];
- /* init message header */
- memcpy(msg->magic, CLD_MSG_MAGIC, CLD_MAGIC_SZ);
- msg->op = ((struct cld_msg_hdr *)msg_)->op;
+ op->sess = sess;
+
+ outpkt = op->pkt;
+ pkt_len = op->pkt_len;
- /* copy message trailer */
- memcpy(msg + 1, msg_ + sizeof(*msg), msglen - sizeof(*msg));
+ outmsg_mem = (outpkt + 1);
+ outmsg = outmsg_mem;
- op->pkt = outpkt;
- op->pkt_len = pkt_len;
- op->next_retry = current_time.tv_sec + CLD_RETRY_START;
+ /* init packet header */
+ pkt_init_sess(outpkt, sess);
- if (!authsign(outpkt, pkt_len)) {
- op_unref(op);
- return false;
- }
+ if (first_frag) {
+ first_frag = false;
+ outpkt->flags |= GUINT32_TO_LE(CPF_FIRST);
+ }
- /* if out_q empty, start retry timer */
- if (!sess->out_q)
- timer_add(&sess->retry_timer, time(NULL) + CLD_RETRY_START);
+ copy_len = MIN(pkt_len - sizeof(*outpkt), msg_left);
+ memcpy(outmsg_mem, p, copy_len);
+
+ p += copy_len;
+ msg_left -= copy_len;
+
+ op->pkt_len =
+ pkt_len = sizeof(*outpkt) + copy_len + SHA_DIGEST_LENGTH;
- sess->out_q = g_list_append(sess->out_q, op);
+ if (!msg_left) {
+ op->done_cb = done_cb;
+ op->done_data = done_data;
- udp_tx(sess->sock, (struct sockaddr *) &sess->addr,
- sess->addr_len, outpkt, pkt_len);
+ outpkt->flags |= GUINT32_TO_LE(CPF_LAST);
+ }
+
+ op->next_retry = current_time.tv_sec + CLD_RETRY_START;
+
+ if (!authsign(outpkt, pkt_len))
+ goto err_out; /* FIXME: we free all pkts -- wrong! */
+
+ udp_tx(sess->sock, (struct sockaddr *) &sess->addr,
+ sess->addr_len, outpkt, pkt_len);
+ }
+
+ session_outq(sess, tmp_root);
return true;
+
+err_out:
+ for (i = 0; i < n_pkts; i++)
+ op_unref(pkts[i]);
+ g_list_free(tmp_root);
+ return false;
}
void msg_ack(struct msg_params *mp)
diff --git a/test/it-works.c b/test/it-works.c
index b635fcb..dc8a0e7 100644
--- a/test/it-works.c
+++ b/test/it-works.c
@@ -86,7 +86,7 @@ static int init(void)
if (rc)
return rc;
- // udp->sess->verbose = true;
+ udp->sess->verbose = true;
event_set(&udp_ev, udp->fd, EV_READ | EV_PERSIST, udp_event, udp);
diff --git a/test/load-file-event.c b/test/load-file-event.c
index bbd37c2..f0c1e10 100644
--- a/test/load-file-event.c
+++ b/test/load-file-event.c
@@ -214,7 +214,7 @@ static int init(char *name)
if (rc)
return rc;
- // run.udp->sess->verbose = true;
+ run.udp->sess->verbose = true;
event_set(&run.udp_ev, run.udp->fd, EV_READ | EV_PERSIST,
udp_event, &run);
diff --git a/test/lock-file-event.c b/test/lock-file-event.c
index 0166c39..76ffd8d 100644
--- a/test/lock-file-event.c
+++ b/test/lock-file-event.c
@@ -224,7 +224,7 @@ static int init(void)
if (rc)
return rc;
- // run.udp->sess->verbose = true;
+ run.udp->sess->verbose = true;
event_set(&run.udp_ev, run.udp->fd, EV_READ | EV_PERSIST,
udp_event, &run);
diff --git a/test/save-file-event.c b/test/save-file-event.c
index 58f5054..e1448bf 100644
--- a/test/save-file-event.c
+++ b/test/save-file-event.c
@@ -220,7 +220,7 @@ static int init(char *name)
if (rc)
return rc;
- // run.udp->sess->verbose = true;
+ run.udp->sess->verbose = true;
event_set(&run.udp_ev, run.udp->fd, EV_READ | EV_PERSIST,
udp_event, &run);
diff --git a/test/start-daemon b/test/start-daemon
index 4cb9fd7..010f7db 100755
--- a/test/start-daemon
+++ b/test/start-daemon
@@ -6,7 +6,7 @@ then
exit 1
fi
-../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E
+../server/cld -P cld.pid -d "$PWD/data" -p 18181 -E -D 2
sleep 3
--
To unsubscribe from this list: send the line "unsubscribe hail-devel" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html