Changeset: 01bd83558569 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=01bd83558569 Modified Files: ctest/tools/monetdbe/example_proxy.c monetdb5/modules/mal/remote.c monetdb5/modules/mal/remote.h Branch: monetdbe-proxy Log Message:
Fix more flush related bugs. diffs (truncated from 564 to 300 lines): diff --git a/ctest/tools/monetdbe/example_proxy.c b/ctest/tools/monetdbe/example_proxy.c --- a/ctest/tools/monetdbe/example_proxy.c +++ b/ctest/tools/monetdbe/example_proxy.c @@ -37,7 +37,7 @@ main(void) if ((err = monetdbe_query(mdbe, "INSERT INTO test VALUES (100, 'WHAAT'); ", NULL, NULL)) != NULL) error(err) - if ((err = monetdbe_query(mdbe, "SELECT x, y FROM test; ", &result, NULL)) != NULL) + if ((err = monetdbe_query(mdbe, "SELECT x, y, 1 AS z FROM test; ", &result, NULL)) != NULL) error(err) fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n", result->ncols, result->nrows); @@ -47,6 +47,24 @@ main(void) if ((err = monetdbe_result_fetch(result, &rcol, c)) != NULL) error(err) switch (rcol->type) { + case monetdbe_int8_t: { + monetdbe_column_int8_t * col = (monetdbe_column_int8_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } + case monetdbe_int16_t: { + monetdbe_column_int16_t * col = (monetdbe_column_int16_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } case monetdbe_int32_t: { monetdbe_column_int32_t * col = (monetdbe_column_int32_t *) rcol; if (col->data[r] == col->null_value) { @@ -83,7 +101,6 @@ main(void) if ((err = monetdbe_query(mdbe, "DELETE FROM test where x = 100;", NULL, NULL)) != NULL) error(err) - if ((err = monetdbe_query(mdbe, "SELECT x, y, 1 AS some_int FROM test WHERE x > 10; ", &result, NULL)) != NULL) error(err) @@ -94,6 +111,24 @@ main(void) if ((err = monetdbe_result_fetch(result, &rcol, c)) != NULL) error(err) switch (rcol->type) { + case monetdbe_int8_t: { + monetdbe_column_int8_t * col = (monetdbe_column_int8_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } + case monetdbe_int16_t: { + monetdbe_column_int16_t * col = (monetdbe_column_int16_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } case monetdbe_int32_t: { monetdbe_column_int32_t * col = (monetdbe_column_int32_t *) rcol; if (col->data[r] == col->null_value) { @@ -127,6 +162,66 @@ main(void) if ((err = monetdbe_cleanup_result(mdbe, result)) != NULL) error(err) + if ((err = monetdbe_query(mdbe, "SELECT x, y, 1 AS some_int FROM test WHERE x > 20; ", &result, NULL)) != NULL) + error(err) + + fprintf(stdout, "Query result with %zu cols and %"PRId64" rows\n", result->ncols, result->nrows); + for (int64_t r = 0; r < result->nrows; r++) { + for (size_t c = 0; c < result->ncols; c++) { + monetdbe_column* rcol; + if ((err = monetdbe_result_fetch(result, &rcol, c)) != NULL) + error(err) + switch (rcol->type) { + case monetdbe_int8_t: { + monetdbe_column_int8_t * col = (monetdbe_column_int8_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } + case monetdbe_int16_t: { + monetdbe_column_int16_t * col = (monetdbe_column_int16_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } + case monetdbe_int32_t: { + monetdbe_column_int32_t * col = (monetdbe_column_int32_t *) rcol; + if (col->data[r] == col->null_value) { + printf("NULL"); + } else { + printf("%d", col->data[r]); + } + break; + } + case monetdbe_str: { + monetdbe_column_str * col = (monetdbe_column_str *) rcol; + if (col->is_null(col->data[r])) { + printf("NULL"); + } else { + printf("%s", (char*) col->data[r]); + } + break; + } + default: { + printf("UNKNOWN"); + } + } + + if (c + 1 < result->ncols) { + printf(", "); + } + } + printf("\n"); + } + + if ((err = monetdbe_cleanup_result(mdbe, result)) != NULL) + error(err) if (monetdbe_close(mdbe)) error("Failed to close database") diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c --- a/monetdb5/modules/mal/remote.c +++ b/monetdb5/modules/mal/remote.c @@ -554,6 +554,188 @@ str RMTreadbatheader(stream* sin, char* return MAL_SUCCEED; } +typedef struct _binbat_v1 { + int Ttype; + oid Hseqbase; + oid Tseqbase; + bool + Tsorted:1, + Trevsorted:1, + Tkey:1, + Tnonil:1, + Tdense:1; + BUN size; + size_t headsize; + size_t tailsize; + size_t theapsize; +} binbat; + +static str +RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush) +{ + binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0, 0 }; + char *nme = NULL; + char *val = NULL; + char tmp; + size_t len; + lng lv, *lvp; + + BAT *b; + + /* hdr is a JSON structure that looks like + * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0} + * we take the binary data directly from the stream */ + + /* could skip whitespace, but we just don't allow that */ + if (*hdr++ != '{') + throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header (got '%s')", hdr - 1); + while (*hdr != '\0') { + switch (*hdr) { + case '"': + /* we assume only numeric values, so all strings are + * elems */ + if (nme != NULL) { + *hdr = '\0'; + } else { + nme = hdr + 1; + } + break; + case ':': + val = hdr + 1; + break; + case ',': + case '}': + if (val == NULL) + throw(MAL, "remote.bincopyfrom", + "illegal input, JSON value missing"); + *hdr = '\0'; + + lvp = &lv; + len = sizeof(lv); + /* tseqbase can be 1<<31/1<<63 which causes overflow + * in lngFromStr, so we check separately */ + if (strcmp(val, +#if SIZEOF_OID == 8 + "9223372036854775808" +#else + "2147483648" +#endif + ) == 0 && + strcmp(nme, "tseqbase") == 0) { + bb.Tseqbase = oid_nil; + } else { + /* all values should be non-negative, so we check that + * here as well */ + if (lngFromStr(val, &len, &lvp, true) < 0 || + lv < 0 /* includes lng_nil */) + throw(MAL, "remote.bincopyfrom", + "bad %s value: %s", nme, val); + + /* deal with nme and val */ + if (strcmp(nme, "version") == 0) { + if (lv != 1) + throw(MAL, "remote.bincopyfrom", + "unsupported version: %s", val); + } else if (strcmp(nme, "hseqbase") == 0) { +#if SIZEOF_OID < SIZEOF_LNG + if (lv > GDK_oid_max) + throw(MAL, "remote.bincopyfrom", + "bad %s value: %s", nme, val); +#endif + bb.Hseqbase = (oid)lv; + } else if (strcmp(nme, "ttype") == 0) { + if (lv >= GDKatomcnt) + throw(MAL, "remote.bincopyfrom", + "bad %s value: %s", nme, val); + bb.Ttype = (int) lv; + } else if (strcmp(nme, "tseqbase") == 0) { +#if SIZEOF_OID < SIZEOF_LNG + if (lv > GDK_oid_max) + throw(MAL, "remote.bincopyfrom", + "bad %s value: %s", nme, val); +#endif + bb.Tseqbase = (oid) lv; + } else if (strcmp(nme, "tsorted") == 0) { + bb.Tsorted = lv != 0; + } else if (strcmp(nme, "trevsorted") == 0) { + bb.Trevsorted = lv != 0; + } else if (strcmp(nme, "tkey") == 0) { + bb.Tkey = lv != 0; + } else if (strcmp(nme, "tnonil") == 0) { + bb.Tnonil = lv != 0; + } else if (strcmp(nme, "tdense") == 0) { + bb.Tdense = lv != 0; + } else if (strcmp(nme, "size") == 0) { + if (lv > (lng) BUN_MAX) + throw(MAL, "remote.bincopyfrom", + "bad %s value: %s", nme, val); + bb.size = (BUN) lv; + } else if (strcmp(nme, "tailsize") == 0) { + bb.tailsize = (size_t) lv; + } else if (strcmp(nme, "theapsize") == 0) { + bb.theapsize = (size_t) lv; + } else { + throw(MAL, "remote.bincopyfrom", + "unknown element: %s", nme); + } + } + nme = val = NULL; + break; + } + hdr++; + } + + b = COLnew(0, bb.Ttype, bb.size, TRANSIENT); + if (b == NULL) + throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL); + + /* for strings, the width may not match, fix it to match what we + * retrieved */ + if (bb.Ttype == TYPE_str && bb.size) { + b->twidth = (unsigned short) (bb.tailsize / bb.size); + b->tshift = ATOMelmshift(Tsize(b)); + } + + if (bb.tailsize > 0) { + if (HEAPextend(&b->theap, bb.tailsize, true) != GDK_SUCCEED || + mnstr_read(in, b->theap.base, bb.tailsize, 1) < 0) + goto bailout; + b->theap.dirty = true; + } + if (bb.theapsize > 0) { + if (HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED || + mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0) + goto bailout; + b->tvheap->free = bb.theapsize; + b->tvheap->dirty = true; + } + _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list