Changeset: db2c06e5287d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/db2c06e5287d
Modified Files:
        common/stream/mapi_stream.c
Branch: default
Log Message:

Fully implement mapi_request_upload


diffs (153 lines):

diff --git a/common/stream/mapi_stream.c b/common/stream/mapi_stream.c
--- a/common/stream/mapi_stream.c
+++ b/common/stream/mapi_stream.c
@@ -9,20 +9,146 @@
 #include "monetdb_config.h"
 #include "stream.h"
 #include "stream_internal.h"
+#include "mapi_prompt.h"
+
+
+static void
+discard(stream *s)
+{
+       static char bitbucket[8192];
+       while (1) {
+               ssize_t nread = mnstr_read(s, bitbucket, 1, sizeof(bitbucket));
+               if (nread <= 0)
+                       return;
+               assert(1);
+       }
+}
 
 struct mapi_recv_upload {
        stream *from_client; // set to NULL after sending MAPI_PROMPT3
        stream *to_client; // set to NULL when client sends empty
 };
 
+static ssize_t
+recv_upload_read(stream *s, void *buf, size_t elmsize, size_t cnt)
+{
+       struct mapi_recv_upload *state = s->stream_data.p;
 
+       if (state->from_client == NULL) {
+               assert(s->eof);
+               return 0;
+       }
+
+       ssize_t nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+       if (nread != 0 || state->from_client->eof)
+               return nread;
+
+       // before returning the 0 we send the prompt and make another attempt.
+       if (
+                       mnstr_write(state->to_client, PROMPT2, strlen(PROMPT2), 
1) != 1
+               ||      mnstr_flush(state->to_client, MNSTR_FLUSH_ALL) < 0
+       ) {
+               mnstr_set_error(s, mnstr_errnr(state->to_client), "%s", 
mnstr_peek_error(state->to_client));
+               return -1;
+       }
+
+       // if it succeeds, return that to the client.
+       // if it's still a block boundary, return that to the client.
+       // if there's an error, return that to the client.
+       nread = mnstr_read(state->from_client, buf, elmsize, cnt);
+       if (nread > 0)
+               return nread;
+       if (nread == 0) {
+               s->eof = true;
+               state->from_client = NULL;
+               return nread;
+       } else {
+               mnstr_set_error(s, mnstr_errnr(state->from_client), "%s", 
mnstr_peek_error(state->from_client));
+               return -1;
+       }
+}
+
+static void
+recv_upload_close(stream *s)
+{
+       struct mapi_recv_upload *state = s->stream_data.p;
+
+       stream *from = state->from_client;
+       if (from)
+               discard(from);
+
+       stream *to = state->to_client;
+       mnstr_write(to, PROMPT3, strlen(PROMPT3), 1);
+       mnstr_flush(to, MNSTR_FLUSH_ALL);
+}
+
+static void
+recv_upload_destroy(stream *s)
+{
+       struct mapi_recv_upload *state = s->stream_data.p;
+       free(state);
+       free(s);
+}
 
 
 stream*
 mapi_request_upload(const char *filename, bool binary, stream *from, stream 
*to)
 {
-       (void)from; (void)to; (void)binary;
-       mnstr_set_open_error(filename, 0, "ON CLIENT not supported yet");
-       return NULL;
+       const char *msg = NULL;
+       stream *s = NULL;
+       struct mapi_recv_upload *state = NULL;
+       ssize_t nwritten;
+
+       assert(from->readonly);
+       assert(!to->readonly);
+       assert(isa_block_stream(from));
+       assert(isa_block_stream(to));
+
+       if (binary)
+               nwritten = mnstr_printf(to, "%srb %s\n", PROMPT3, filename);
+       else
+               nwritten = mnstr_printf(to, "%sr 0 %s\n", PROMPT3, filename);
+       if (nwritten <= 0) {
+               msg = mnstr_peek_error(to);
+               goto end;
+       }
+       if (mnstr_flush(to, MNSTR_FLUSH_ALL) < 0) {
+               msg = mnstr_peek_error(to);
+               goto end;
+       }
+
+       char buf[256];
+       if (mnstr_readline(from, buf, sizeof(buf)) != 1 || buf[0] != '\n') {
+               msg = buf;
+               discard(from);
+               goto end;
+       }
+
+       // Client accepted the request
+       state = malloc(sizeof(*state));
+       if (!state) {
+               msg = "malloc failed";
+               goto end;
+       }
+       s = create_stream("ONCLIENT");
+       if (!s) {
+               msg = mnstr_peek_error(NULL);
+               goto end;
+       }
+       state->from_client = from;
+       state->to_client = to;
+       s->stream_data.p = state;
+       s->binary= binary;
+       s->read = recv_upload_read;
+       s->close = recv_upload_close;
+       s->destroy = recv_upload_destroy;
+end:
+       if (msg) {
+               mnstr_destroy(s);
+               mnstr_set_open_error(filename, 0, "ON CLIENT: %s", msg);
+               return NULL;
+       } else {
+               return s;
+       }
 }
 
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to