Changeset: 31989c598f67 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=31989c598f67
Modified Files:
clients/mapiclient/mclient.c
clients/mapilib/mapi.c
common/stream/stream.c
common/stream/stream.h
sql/backends/monet5/sql_result.c
Branch: protocol
Log Message:
result sets actually working
diffs (truncated from 395 to 300 lines):
diff --git a/clients/mapiclient/mclient.c b/clients/mapiclient/mclient.c
--- a/clients/mapiclient/mclient.c
+++ b/clients/mapiclient/mclient.c
@@ -856,6 +856,7 @@ CSVrenderer(MapiHdl hdl)
i == 0 ? "" : sep, s);
}
mnstr_printf(toConsole, "\n");
+ mnstr_flush(toConsole);
}
}
diff --git a/clients/mapilib/mapi.c b/clients/mapilib/mapi.c
--- a/clients/mapilib/mapi.c
+++ b/clients/mapilib/mapi.c
@@ -818,6 +818,9 @@
#define MAPIBLKSIZE 256 /* minimum buffer shipped */
+typedef char* (*mapi_converter)(void*);
+
+
/* information about the columns in a result set */
struct MapiColumn {
char *tablename;
@@ -826,7 +829,9 @@ struct MapiColumn {
int columnlength;
int digits;
int scale;
- void* dataprot10;
+ char* buffer_ptr;
+ char write_buf[50];
+ mapi_converter converter;
};
/* information about bound columns */
@@ -2662,6 +2667,7 @@ mapi_reconnect(Mapi mid)
return mid->error;
}
}
+ mid->protocol = prot_version;
/* in rest now should be the byte order of the server */
byteo = rest;
@@ -2803,8 +2809,7 @@ mapi_reconnect(Mapi mid)
check_stream(mid, mid->to, "Could not send initial byte sequence",
"mapi_reconnect", mid->error);
if (prot_version == prot10 || prot_version == prot10compressed) {
-
- printf("Using protocol version %s.\n", prot_version == prot10
? "PROT10" : "PROT10COMPR");
+ //printf("Using protocol version %s.\n", prot_version == prot10
? "PROT10" : "PROT10COMPR");
assert(isa_block_stream(mid->to));
assert(isa_block_stream(mid->from));
@@ -3986,6 +3991,41 @@ parse_header_line(MapiHdl hdl, char *lin
lines may cause a new result set to be created in which case all
subsequent lines are added to that result set.
*/
+
+static char* mapi_convert_varchar(struct MapiColumn *col) {
+ return col->buffer_ptr;
+}
+
+static char* mapi_convert_int(struct MapiColumn *col) {
+ sprintf(col->write_buf, "%d", *((int*) col->buffer_ptr));
+ return (char*) col->write_buf;
+}
+
+static char* mapi_convert_smallint(struct MapiColumn *col) {
+ sprintf(col->write_buf, "%hd", *((short*) col->buffer_ptr));
+ return (char*) col->write_buf;
+}
+
+static char* mapi_convert_tinyint(struct MapiColumn *col) {
+ sprintf(col->write_buf, "%hhd", *((signed char*) col->buffer_ptr));
+ return (char*) col->write_buf;
+}
+
+static char* mapi_convert_boolean(struct MapiColumn *col) {
+ if (*((signed char*) col->buffer_ptr) == 1) {
+ return "true";
+ }
+ else {
+ return "false";
+ }
+}
+
+static char* mapi_convert_unknown(struct MapiColumn *col) {
+ (void) col;
+ return "<unknown>";
+}
+
+
static MapiMsg
read_into_cache(MapiHdl hdl, int lookahead)
{
@@ -4012,7 +4052,8 @@ read_into_cache(MapiHdl hdl, int lookahe
lng nr_rows;
lng nr_cols;
lng i;
- result = malloc(sizeof(struct MapiResultSet));
+ result = new_result(hdl);
+
if (!result) {
// TODO: actually set mid->error :)
return mid->error;
@@ -4022,13 +4063,20 @@ read_into_cache(MapiHdl hdl, int lookahe
!mnstr_readLng(mid->from, &nr_cols)) {
return mid->error;
}
- fprintf(stderr, "result_set_id=%d, nr_rows=%llu,
nr_cols=%lld\n", result_set_id, nr_rows, nr_cols);
+ // fprintf(stderr, "result_set_id=%d, nr_rows=%llu,
nr_cols=%lld\n", result_set_id, nr_rows, nr_cols);
result->fieldcnt = nr_cols;
+ result->maxfields = nr_cols;
result->row_count = nr_rows;
result->fields = malloc(sizeof(struct MapiColumn) *
result->fieldcnt);
result->tableid = result_set_id;
result->querytype = Q_TABLE;
result->tuple_count = 0;
+ result->rows_read = 0;
+// result->errorstr = NULL;
+// result->cache.line = NULL;
+// result->next = NULL;
+// result->hdl = hdl;
+
for (i = 0; i < nr_cols; i++) {
lng col_info_length;
@@ -4052,11 +4100,28 @@ read_into_cache(MapiHdl hdl, int lookahe
!mnstr_readInt(mid->from,
&typelen)) {
return mid->error;
}
- fprintf(stderr, "%lld col_info_length=%lld,
table_name=%s, col_name=%s, type_sql_name=%s, type_len=%d\n",
- i, col_info_length, table_name,
col_name, type_sql_name, typelen);
+ // fprintf(stderr, "%lld col_info_length=%lld,
table_name=%s, col_name=%s, type_sql_name=%s, type_len=%d\n",
+ // i, col_info_length, table_name,
col_name, type_sql_name, typelen);
result->fields[i].columnname = col_name;
result->fields[i].tablename = table_name;
result->fields[i].columntype = type_sql_name;
+ result->fields[i].columnlength = typelen;
+
+ if (strcasecmp(type_sql_name, "varchar") == 0) {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_varchar;
+ } else if (strcasecmp(type_sql_name, "int") ==
0) {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_int;
+ } else if (strcasecmp(type_sql_name,
"smallint") == 0) {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_smallint;
+ } else if (strcasecmp(type_sql_name, "tinyint")
== 0) {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_tinyint;
+ } else if (strcasecmp(type_sql_name, "boolean")
== 0) {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_boolean;
+ } else {
+ result->fields[i].converter =
(mapi_converter) mapi_convert_unknown;
+ // TODO: complain
+ }
+ // TODO: NULLs
}
hdl->result = result;
hdl->active = result;
@@ -5287,25 +5352,62 @@ mapi_fetch_row(MapiHdl hdl)
{
char *reply;
int n;
+ size_t i;
struct MapiResultSet *result;
- if (hdl->mid->protocol == prot10) {
+ if (hdl->mid->protocol == prot10 || hdl->mid->protocol ==
prot10compressed) {
+ char* buf;
+
result = hdl->result;
result->rows_read++;
+ if (result->rows_read >= result->row_count) {
+ hdl->mid->active = NULL;
+ hdl->active = NULL;
+ return 0;
+ }
// do we have any rows in our cache
- if (result->rows_read > result->tuple_count &&
result->rows_read < result->row_count) {
+ if (result->rows_read > result->tuple_count) {
// read block from socket
lng nrows = 0;
- // we flush on the other side so this read will always
fail
- if (!mnstr_readLng(mid->from, &nrows)) {
+ if (!mnstr_writeChr(hdl->mid->to, 42) ||
mnstr_flush(hdl->mid->to)) {
+ // FIXME: set hdl->mid to something
+
+ return hdl->mid->error;
+ }
+ bs2_resetbuf(hdl->mid->from); // kinda a bit evil
+ // this actually triggers the read of the entire block
from now we operate on buffer
+ if (!mnstr_readLng(hdl->mid->from, &nrows)) {
// FIXME: set hdl->mid to something
return hdl->mid->error;
}
+ bs2_resetbuf(hdl->mid->from);
fprintf(stderr, "nrows=%llu\n", nrows);
-
+ buf = (char*) bs2_getbuf(hdl->mid->from) + sizeof(lng);
// iterate over cols
+ for (i = 0; i < (size_t) result->fieldcnt; i++) {
+ result->fields[i].buffer_ptr = buf;
+ if (result->fields[i].columnlength < 0) {
+ // variable-length column
+ lng col_len = *((lng*) buf);
+ assert((size_t) col_len <
hdl->mid->blocksize && col_len > 0);
+ result->fields[i].buffer_ptr +=
sizeof(lng);
+ buf += col_len + sizeof(lng);
+ } else {
+ buf += nrows *
result->fields[i].columnlength;
+ }
+ }
+ result->tuple_count += nrows;
+ } else {
+ for (i = 0; i < (size_t) result->fieldcnt; i++) {
+ if (result->fields[i].columnlength < 0) {
+ // variable-length column
+ result->fields[i].buffer_ptr +=
strlen(result->fields[i].buffer_ptr) + 1;
+ } else {
+ result->fields[i].buffer_ptr +=
result->fields[i].columnlength;
+ }
+ }
}
return result->fieldcnt;
}
@@ -5364,12 +5466,22 @@ mapi_fetch_all_rows(MapiHdl hdl)
return result ? result->cache.tuplecount : 0;
}
+
char *
mapi_fetch_field(MapiHdl hdl, int fnr)
{
int cr;
struct MapiResultSet *result;
-
+ if (hdl->mid->protocol == prot10 || hdl->mid->protocol ==
prot10compressed) {
+ result = hdl->result;
+ assert (result->rows_read <= result->tuple_count &&
result->rows_read <= result->row_count);
+ if (fnr > result->fieldcnt) {
+ mapi_setError(hdl->mid, "column index out of bounds",
"mapi_fetch_field", MERROR);
+ return NULL;
+ }
+
+ return result->fields[fnr].converter((void*)
&result->fields[fnr]);
+ }
mapi_hdl_check0(hdl, "mapi_fetch_field");
if ((result = hdl->result) == NULL ||
@@ -5396,7 +5508,8 @@ mapi_fetch_field_len(MapiHdl hdl, int fn
{
int cr;
struct MapiResultSet *result;
-
+ // FIXME
+ assert(0);
mapi_hdl_check0(hdl, "mapi_fetch_field_len");
if ((result = hdl->result) == NULL ||
diff --git a/common/stream/stream.c b/common/stream/stream.c
--- a/common/stream/stream.c
+++ b/common/stream/stream.c
@@ -4378,6 +4378,26 @@ bs2_read(stream *ss, void *buf, size_t e
}
+
+void*
+bs2_getbuf(stream *ss)
+{
+ bs2 *s = (bs2 *) ss->stream_data.p;
+ assert(ss->read == bs2_read);
+ return (void*) s->buf;
+}
+
+
+void
+bs2_resetbuf(stream *ss)
+{
+ bs2 *s = (bs2 *) ss->stream_data.p;
+ assert(ss->read == bs2_read);
+ s->itotal = 0;
+ s->nr = 0;
+ s->readpos = 0;
+}
+
int
isa_block_stream(stream *s)
{
@@ -4452,6 +4472,14 @@ mnstr_readChr(stream *s, char *val)
}
int
+mnstr_writeChr(stream *s, char val)
+{
+ if (s == NULL || s->errnr)
+ return 0;
+ return s->write(s, (void *) &val, sizeof(val), 1) == 1;
+}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list