Changeset: db2c06e5287d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/db2c06e5287d
Modified Files:
common/stream/mapi_stream.c
Branch: default
Log Message:
Fully implement mapi_request_upload
diffs (153 lines):
diff --git a/common/stream/mapi_stream.c b/common/stream/mapi_stream.c
--- a/common/stream/mapi_stream.c
+++ b/common/stream/mapi_stream.c
@@ -9,20 +9,146 @@
#include "monetdb_config.h"
#include "stream.h"
#include "stream_internal.h"
+#include "mapi_prompt.h"
+
+
+static void
+discard(stream *s)
+{
+ static char bitbucket[8192];
+ while (1) {
+ ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
+ if (nread <= 0)
+ return;
+ assert(1);
+ }
+}
struct mapi_recv_upload {
stream *from_client; // set to NULL after sending MAPI_PROMPT3
stream *to_client; // set to NULL when client sends empty
};
+static ssize_t
+recv_upload_read(stream *s, void *buf, size_t elmsize, size_t cnt)
+{
+ struct mapi_recv_upload *state = s->stream_data.p;
+ if (state->from_client == NULL) {
+ assert(s->eof);
+ return 0;
+ }
+
+ ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+ if (nread != 0 || state->from_client->eof)
+ return nread;
+
+ // before returning the 0 we send the prompt and make another attempt.
+ if (
+ mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2),
1) != 1
+ || mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
+ ) {
+ mnstr_set_error(s, mnstr_errnr(state->to_client), "%s",
mnstr_peek_error(state->to_client));
+ return -1;
+ }
+
+ // if it succeeds, return that to the client.
+ // if it's still a block boundary, return that to the client.
+ // if there's an error, return that to the client.
+ nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+ if (nread > 0)
+ return nread;
+ if (nread == 0) {
+ s->eof = true;
+ state->from_client = NULL;
+ return nread;
+ } else {
+ mnstr_set_error(s, mnstr_errnr(state->from_client), "%s",
mnstr_peek_error(state->from_client));
+ return -1;
+ }
+}
+
+static void
+recv_upload_close(stream *s)
+{
+ struct mapi_recv_upload *state = s->stream_data.p;
+
+ stream *from = state->from_client;
+ if (from)
+ discard(from);
+
+ stream *to = state->to_client;
+ mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
+ mnstr_flush(to, MNSTR_FLUSH_ALL);
+}
+
+static void
+recv_upload_destroy(stream *s)
+{
+ struct mapi_recv_upload *state = s->stream_data.p;
+ free(state);
+ free(s);
+}
stream*
mapi_request_upload(const char *filename, bool binary, stream *from, stream
*to)
{
- (void)from; (void)to; (void)binary;
- mnstr_set_open_error(filename, 0, "ON CLIENT not supported yet");
- return NULL;
+ const char *msg = NULL;
+ stream *s = NULL;
+ struct mapi_recv_upload *state = NULL;
+ ssize_t nwritten;
+
+ assert(from->readonly);
+ assert(!to->readonly);
+ assert(isa_block_stream(from));
+ assert(isa_block_stream(to));
+
+ if (binary)
+ nwritten = mnstr_printf(to, "%srb %s\n", PROMPT3, filename);
+ else
+ nwritten = mnstr_printf(to, "%sr 0 %s\n", PROMPT3, filename);
+ if (nwritten <= 0) {
+ msg = mnstr_peek_error(to);
+ goto end;
+ }
+ if (mnstr_flush(to, MNSTR_FLUSH_ALL) < 0) {
+ msg = mnstr_peek_error(to);
+ goto end;
+ }
+
+ char buf[256];
+ if (mnstr_readline(from, buf, sizeof(buf)) != 1 || buf[0] != '\n') {
+ msg = buf;
+ discard(from);
+ goto end;
+ }
+
+ // Client accepted the request
+ state = malloc(sizeof(*state));
+ if (!state) {
+ msg = "malloc failed";
+ goto end;
+ }
+ s = create_stream("ONCLIENT");
+ if (!s) {
+ msg = mnstr_peek_error(NULL);
+ goto end;
+ }
+ state->from_client = from;
+ state->to_client = to;
+ s->stream_data.p = state;
+ s->binary= binary;
+ s->read = recv_upload_read;
+ s->close = recv_upload_close;
+ s->destroy = recv_upload_destroy;
+end:
+ if (msg) {
+ mnstr_destroy(s);
+ mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
+ return NULL;
+ } else {
+ return s;
+ }
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]