Changeset: 31989c598f67 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=31989c598f67
Modified Files:
        clients/mapiclient/mclient.c
        clients/mapilib/mapi.c
        common/stream/stream.c
        common/stream/stream.h
        sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:

result sets actually working


diffs (truncated from 395 to 300 lines):

diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -856,6 +856,7 @@ CSVrenderer(MapiHdl hdl)
                                             i == 0 ? "" : sep, s);
                }
                mnstr_printf(toConsole, "\n");
+               mnstr_flush(toConsole);
        }
 }
 
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -818,6 +818,9 @@
 
 #define MAPIBLKSIZE    256     /* minimum buffer shipped */
 
+typedef char* (*mapi_converter)(void*);
+
+
 /* information about the columns in a result set */
 struct MapiColumn {
        char *tablename;
@@ -826,7 +829,9 @@ struct MapiColumn {
        int columnlength;
        int digits;
        int scale;
-       void* dataprot10;
+       char* buffer_ptr;
+       char write_buf[50];
+       mapi_converter converter;
 };
 
 /* information about bound columns */
@@ -2662,6 +2667,7 @@ mapi_reconnect(Mapi mid)
                                return mid->error;
                        }
                }
+               mid->protocol = prot_version;
 
                /* in rest now should be the byte order of the server */
                byteo = rest;
@@ -2803,8 +2809,7 @@ mapi_reconnect(Mapi mid)
        check_stream(mid, mid->to, "Could not send initial byte sequence", 
"mapi_reconnect", mid->error);
 
        if (prot_version == prot10 || prot_version == prot10compressed) {
-
-               printf("Using protocol version %s.\n", prot_version == prot10  
? "PROT10" : "PROT10COMPR");
+               //printf("Using protocol version %s.\n", prot_version == prot10 
 ? "PROT10" : "PROT10COMPR");
                assert(isa_block_stream(mid->to));
                assert(isa_block_stream(mid->from));
 
@@ -3986,6 +3991,41 @@ parse_header_line(MapiHdl hdl, char *lin
    lines may cause a new result set to be created in which case all
    subsequent lines are added to that result set.
 */
+
+static char* mapi_convert_varchar(struct MapiColumn *col) {
+       return col->buffer_ptr;
+}
+
+static char* mapi_convert_int(struct MapiColumn *col) {
+       sprintf(col->write_buf, "%d", *((int*) col->buffer_ptr));
+       return (char*) col->write_buf;
+}
+
+static char* mapi_convert_smallint(struct MapiColumn *col) {
+       sprintf(col->write_buf, "%hd", *((short*) col->buffer_ptr));
+       return (char*) col->write_buf;
+}
+
+static char* mapi_convert_tinyint(struct MapiColumn *col) {
+       sprintf(col->write_buf, "%hhd", *((signed char*) col->buffer_ptr));
+       return (char*) col->write_buf;
+}
+
+static char* mapi_convert_boolean(struct MapiColumn *col) {
+       if (*((signed char*) col->buffer_ptr) == 1) {
+               return "true";
+       }
+       else {
+               return "false";
+       }
+}
+
+static char* mapi_convert_unknown(struct MapiColumn *col) {
+       (void) col;
+       return "<unknown>";
+}
+
+
 static MapiMsg
 read_into_cache(MapiHdl hdl, int lookahead)
 {
@@ -4012,7 +4052,8 @@ read_into_cache(MapiHdl hdl, int lookahe
                        lng nr_rows;
                        lng nr_cols;
                        lng i;
-                       result = malloc(sizeof(struct MapiResultSet));
+                       result = new_result(hdl);
+
                        if (!result) {
                                // TODO: actually set mid->error :)
                                return mid->error;
@@ -4022,13 +4063,20 @@ read_into_cache(MapiHdl hdl, int lookahe
                                        !mnstr_readLng(mid->from, &nr_cols)) {
                                return mid->error;
                        }
-                       fprintf(stderr, "result_set_id=%d, nr_rows=%llu, 
nr_cols=%lld\n", result_set_id, nr_rows, nr_cols);
+               //      fprintf(stderr, "result_set_id=%d, nr_rows=%llu, 
nr_cols=%lld\n", result_set_id, nr_rows, nr_cols);
                        result->fieldcnt = nr_cols;
+                       result->maxfields = nr_cols;
                        result->row_count = nr_rows;
                        result->fields = malloc(sizeof(struct MapiColumn) * 
result->fieldcnt);
                        result->tableid = result_set_id;
                        result->querytype = Q_TABLE;
                        result->tuple_count = 0;
+                       result->rows_read = 0;
+//                     result->errorstr = NULL;
+//                     result->cache.line = NULL;
+//                     result->next = NULL;
+//                     result->hdl = hdl;
+
 
                        for (i = 0; i < nr_cols; i++) {
                                lng col_info_length;
@@ -4052,11 +4100,28 @@ read_into_cache(MapiHdl hdl, int lookahe
                                                !mnstr_readInt(mid->from, 
&typelen)) {
                                        return mid->error;
                                }
-                               fprintf(stderr, "%lld col_info_length=%lld, 
table_name=%s, col_name=%s, type_sql_name=%s, type_len=%d\n",
-                                               i, col_info_length, table_name, 
col_name, type_sql_name, typelen);
+       //                      fprintf(stderr, "%lld col_info_length=%lld, 
table_name=%s, col_name=%s, type_sql_name=%s, type_len=%d\n",
+       //                                      i, col_info_length, table_name, 
col_name, type_sql_name, typelen);
                                result->fields[i].columnname = col_name;
                                result->fields[i].tablename = table_name;
                                result->fields[i].columntype = type_sql_name;
+                               result->fields[i].columnlength = typelen;
+
+                               if (strcasecmp(type_sql_name, "varchar") == 0) {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_varchar;
+                               } else if (strcasecmp(type_sql_name, "int") == 
0) {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_int;
+                               } else if (strcasecmp(type_sql_name, 
"smallint") == 0) {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_smallint;
+                               } else if (strcasecmp(type_sql_name, "tinyint") 
== 0) {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_tinyint;
+                               } else if (strcasecmp(type_sql_name, "boolean") 
== 0) {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_boolean;
+                               } else {
+                                       result->fields[i].converter = 
(mapi_converter) mapi_convert_unknown;
+                                       // TODO: complain
+                               }
+                               // TODO: NULLs
                        }
                        hdl->result = result;
                        hdl->active = result;
@@ -5287,25 +5352,62 @@ mapi_fetch_row(MapiHdl hdl)
 {
        char *reply;
        int n;
+       size_t i;
        struct MapiResultSet *result;
 
-       if (hdl->mid->protocol == prot10) {
+       if (hdl->mid->protocol == prot10 || hdl->mid->protocol == 
prot10compressed) {
+               char* buf;
+
                result = hdl->result;
                result->rows_read++;
+               if (result->rows_read >= result->row_count) {
+                       hdl->mid->active = NULL;
+                       hdl->active = NULL;
+                       return 0;
+               }
                // do we have any rows in our cache
-               if (result->rows_read > result->tuple_count && 
result->rows_read < result->row_count) {
+               if (result->rows_read > result->tuple_count) {
                        // read block from socket
                        lng nrows = 0;
-                       // we flush on the other side so this read will always 
fail
-                       if (!mnstr_readLng(mid->from, &nrows)) {
+                       if (!mnstr_writeChr(hdl->mid->to, 42) || 
mnstr_flush(hdl->mid->to)) {
+                               // FIXME: set hdl->mid to something
+
+                               return hdl->mid->error;
+                       }
+                       bs2_resetbuf(hdl->mid->from); // kinda a bit evil
+                       // this actually triggers the read of the entire block 
from now we operate on buffer
+                       if (!mnstr_readLng(hdl->mid->from, &nrows)) {
                                // FIXME: set hdl->mid to something
                                return hdl->mid->error;
                        }
+                       bs2_resetbuf(hdl->mid->from);
 
                        fprintf(stderr, "nrows=%llu\n", nrows);
-
+                       buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
 
                        // iterate over cols
+                       for (i = 0; i < (size_t) result->fieldcnt; i++) {
+                               result->fields[i].buffer_ptr = buf;
+                               if (result->fields[i].columnlength < 0) {
+                                       // variable-length column
+                                       lng col_len = *((lng*) buf);
+                                       assert((size_t) col_len < 
hdl->mid->blocksize && col_len > 0);
+                                       result->fields[i].buffer_ptr += 
sizeof(lng);
+                                       buf += col_len + sizeof(lng);
+                               } else {
+                                       buf += nrows * 
result->fields[i].columnlength;
+                               }
+                       }
+                       result->tuple_count += nrows;
+               } else {
+                       for (i = 0; i < (size_t) result->fieldcnt; i++) {
+                               if (result->fields[i].columnlength < 0) {
+                                       // variable-length column
+                                       result->fields[i].buffer_ptr += 
strlen(result->fields[i].buffer_ptr) + 1;
+                               } else {
+                                       result->fields[i].buffer_ptr += 
result->fields[i].columnlength;
+                               }
+                       }
                }
                return result->fieldcnt;
        }
@@ -5364,12 +5466,22 @@ mapi_fetch_all_rows(MapiHdl hdl)
        return result ? result->cache.tuplecount : 0;
 }
 
+
 char *
 mapi_fetch_field(MapiHdl hdl, int fnr)
 {
        int cr;
        struct MapiResultSet *result;
-
+       if (hdl->mid->protocol == prot10 || hdl->mid->protocol == 
prot10compressed) {
+               result = hdl->result;
+               assert (result->rows_read <= result->tuple_count && 
result->rows_read <= result->row_count);
+               if (fnr > result->fieldcnt) {
+                       mapi_setError(hdl->mid, "column index out of bounds", 
"mapi_fetch_field", MERROR);
+                       return NULL;
+               }
+
+               return result->fields[fnr].converter((void*) 
&result->fields[fnr]);
+       }
        mapi_hdl_check0(hdl, "mapi_fetch_field");
 
        if ((result = hdl->result) == NULL ||
@@ -5396,7 +5508,8 @@ mapi_fetch_field_len(MapiHdl hdl, int fn
 {
        int cr;
        struct MapiResultSet *result;
-
+       // FIXME
+       assert(0);
        mapi_hdl_check0(hdl, "mapi_fetch_field_len");
 
        if ((result = hdl->result) == NULL ||
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4378,6 +4378,26 @@ bs2_read(stream *ss, void *buf, size_t e
 }
 
 
+
+void*
+bs2_getbuf(stream *ss)
+{
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       assert(ss->read == bs2_read);
+       return (void*) s->buf;
+}
+
+
+void
+bs2_resetbuf(stream *ss)
+{
+       bs2 *s = (bs2 *) ss->stream_data.p;
+       assert(ss->read == bs2_read);
+       s->itotal = 0;
+       s->nr = 0;
+       s->readpos = 0;
+}
+
 int
 isa_block_stream(stream *s)
 {
@@ -4452,6 +4472,14 @@ mnstr_readChr(stream *s, char *val)
 }
 
 int
+mnstr_writeChr(stream *s, char val)
+{
+       if (s == NULL || s->errnr)
+               return 0;
+       return s->write(s, (void *) &val, sizeof(val), 1) == 1;
+}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to