Changeset: 8a26d4902af5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8a26d4902af5
Modified Files:
Branch: default
Log Message:
Merge with default
diffs (truncated from 472 to 300 lines):
diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_int.mal
b/monetdb5/modules/mal/Tests/bincopyfrom_int.mal
new file mode 100644
index
0000000000000000000000000000000000000000..c9f3f7ce9f0cd3a59dfb1ecee1135ec92f2ed288
GIT binary patch
literal 252
zc$_7U!A`_53`Bd*SB$ckE{awQsyGyJSiXTDNES<tNSzXUw+e`VC$zVjXM4s6=oAG?
zqR5;o)?7ZNNqD5Fnh3-n(KDoq(u{s3+=LKW9VEl2)mJeG3<$Nw99>H!yj|XL&8$AM
zY=Ccbpn~D4U2dR_k}+X?Ei`APQ}7xhU-MG?<BR#RcPbJy+wsxaN$Wy(JUgqqdUjUm
lD|p$d#BbUb|3h~(G&7N3c6a_Y4_()pm`_V-ulJ#K`~d@fQR@Hz
diff --git a/monetdb5/modules/mal/Tests/bincopyfrom_str.mal
b/monetdb5/modules/mal/Tests/bincopyfrom_str.mal
new file mode 100644
index
0000000000000000000000000000000000000000..df2b5b515cb6cf8377d7db5528315802e33242c0
GIT binary patch
literal 8482
zc%1Fpu}%Xq3<luNnHZRutaQ0br;0)c)QSOCRtDDGVQ~>@a>RxzMLZxIFTy);+zmVd
zV&MOzNVb*Oet^tLJ1?XxN-2gy`;;GL2+2|Ks)32)y?2qMi1LoAJ=3!=iRPLlkqdiQ
z#LQ_;O&Quy&zaiJ<wZTTX1cz*r8Y%%KFU}a9{Tt|w&M_zB_GG?a9nRpv$@Q?OUlc}
zY2#y?`cf^n=GDG%7N4TstZLh7>Dg{hYpYYVI<KwpQ{!YRi$Cyqa~K<blJ6N)c5${^
zznP;Mz;B?xUH||900000000000002sf4Vuo`}jP0Ka778X2z#wpMBJ-S!~T04^>_}
diff --git a/monetdb5/modules/mal/remote.mx b/monetdb5/modules/mal/remote.mx
--- a/monetdb5/modules/mal/remote.mx
+++ b/monetdb5/modules/mal/remote.mx
@@ -132,6 +132,20 @@
address RMTbatload
comment "create a BAT of the given type and size, and load values from the
input stream";
+pattern batbincopy(b:bat):void
+address RMTbincopyto
+comment "dump BAT b in binary form to the stream";
+pattern batbincopy():bat[:void,:any]
+address RMTbincopyfrom
+comment "store the binary BAT data in the BBP and return as BAT";
+
+pattern bintype():void
+address RMTbintype
+comment "print the binary type of this mserver5";
+
+
+# initialise our localtype
+remote.prelude();
@{
@h
@@ -183,10 +197,19 @@
*/
@- Implementation
@c
+
+#define RMTT_L_ENDIAN 0<<1
+#define RMTT_B_ENDIAN 1<<1
+#define RMTT_32_BITS 0<<2
+#define RMTT_64_BITS 1<<2
+#define RMTT_32_OIDS 0<<3
+#define RMTT_64_OIDS 1<<3
+
typedef struct _connection {
MT_Lock lock; /* lock to avoid interference */
str name; /* the handle for this connection */
Mapi mconn; /* the Mapi handle for the connection */
+ unsigned char type; /* binary profile of the connection
target */
size_t nextid; /* id counter */
struct _connection *next; /* the next connection in the list */
} *connection;
@@ -196,8 +219,10 @@
remote_export str RMTepilogue(int *ret);
@c
static connection conns = NULL;
+static unsigned char localtype = 0;
static inline str RMTquery(MapiHdl *ret, str func, Mapi conn, str query);
+static inline str RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in);
@h
remote_export str RMTresolve(int *ret, str *pat);
@@ -282,6 +307,7 @@
char conn[BUFSIZ];
char *s;
Mapi m;
+ MapiHdl hdl;
/* just make sure the return isn't garbage */
*ret = 0;
@@ -298,6 +324,9 @@
if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is
"
"NULL or nil");
+ if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
+ throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenation
'%s' "
+ "is not supported", *scen);
m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
if (mapi_error(m))
@@ -331,6 +360,15 @@
c->next = conns;
conns = c;
+ RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
+ if (hdl != NULL && mapi_fetch_row(hdl)) {
+ char *val = mapi_fetch_field(hdl, 0);
+ c->type = (unsigned char)atoi(val);
+ mapi_close_handle(hdl);
+ } else {
+ c->type = 0;
+ }
+
MT_lock_init(&c->lock, "remote connection lock");
#ifdef _DEBUG_MAPI_
@@ -521,6 +559,24 @@
str RMTprelude(int *ret) {
(void)ret;
+ int type = 0;
+
+#ifdef WORDS_BIGENDIAN
+ type |= RMTT_B_ENDIAN;
+#else
+ type |= RMTT_L_ENDIAN;
+#endif
+#if SIZEOF_SIZE_T == SIZEOF_LONG_LONG
+ type |= RMTT_64_BITS;
+#else
+ type |= RMTT_32_BITS;
+#endif
+#if SIZEOF_SIZE_T == SIZEOF_INT || defined(MONET_OID32)
+ type |= RMTT_32_OIDS;
+#else
+ type |= RMTT_64_OIDS;
+#endif
+ localtype = (unsigned char)type;
return(MAL_SUCCEED);
}
@@ -560,7 +616,7 @@
str conn, ident, tmp, rt;
connection c;
char qbuf[BUFSIZ + 1];
- MapiHdl mhdl;
+ MapiHdl mhdl = NULL;
int rtype;
ValPtr v;
@@ -593,7 +649,8 @@
rt, ident);
GDKfree(rt);
- if (isaBatType(rtype)) {
+ if (isaBatType(rtype) && (localtype == 0 || localtype != c->type ||
getHeadType(rtype) != TYPE_void))
+ {
int h, t, s;
ptr l, r;
str val, var;
@@ -608,9 +665,6 @@
/* this call should be a single transaction over the channel*/
mal_set_lock(c->lock, "remote.get");
-#ifdef _DEBUG_REMOTE
- mnstr_printf(cntxt->fdout, "#remote:%s:%s\n", c->name, qbuf);
-#endif
if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
!= MAL_SUCCEED)
{
@@ -656,6 +710,43 @@
v->val.bval = b->batCacheid;
v->vtype = TYPE_bat;
BBPkeepref(b->batCacheid);
+ } else if (isaBatType(rtype)) {
+ /* binary compatible remote host, transfer BAT in binary form */
+ stream *sout;
+ stream *sin;
+ char buf[256];
+ ssize_t sz = 0, rd;
+ str err;
+ BAT *b = NULL;
+
+ /* this call should be a single transaction over the channel*/
+ mal_set_lock(c->lock, "remote.get");
+
+ /* bypass Mapi from this point to efficiently write all data to
+ * the server */
+ sout = mapi_get_to(c->mconn);
+ sin = mapi_get_from(c->mconn);
+
+ /* call our remote helper to do this more efficiently */
+ mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
+ mnstr_flush(sout);
+
+ /* read the JSON header */
+ while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz]
!= '\n') {
+ sz += rd;
+ }
+ if (rd < 0)
+ throw(MAL, "remote.get", "could not read BAT JSON
header");
+ if (buf[0] == '!')
+ return(GDKstrdup(buf));
+
+ buf[sz] = '\0';
+ if ((err = RMTinternalcopyfrom(&b, buf, sin)) != NULL)
+ return(err);
+
+ v->val.bval = b->batCacheid;
+ v->vtype = TYPE_bat;
+ BBPkeepref(b->batCacheid);
} else {
ptr p = NULL;
str val;
@@ -692,7 +783,8 @@
}
}
- mapi_close_handle(mhdl);
+ if (mhdl != NULL)
+ mapi_close_handle(mhdl);
mal_unset_lock(c->lock, "remote.get");
return(MAL_SUCCEED);
@@ -1099,6 +1191,217 @@
return(MAL_SUCCEED);
}
+@h
+remote_export str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+@c
+/**
+ * dump given BAT to stream
+ */
+str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int bid = *(int *)getArgReference(stk, pci, 1);
+ BAT *b = BBPquickdesc(bid, FALSE);
+
+ (void)mb;
+ (void)stk;
+ (void)pci;
+
+ if (b == NULL)
+ throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
+ if (b->htype != TYPE_void)
+ throw(ILLARG, "remote.bincopyto", "only void-headed BATs are
supported");
+
+ mnstr_printf(cntxt->fdout, /*JSON*/"{"
+ "\"version\":1,"
+ "\"htype\":%d,"
+ "\"ttype\":%d,"
+ "\"seqbase\":" OIDFMT ","
+ "\"size\":" SZFMT ","
+ "\"tailsize\":" SZFMT ","
+ "\"theapsize\":" SZFMT
+ "}\n",
+ TYPE_void,
+ b->ttype,
+ b->hseqbase == oid_nil ? 0 : b->hseqbase,
+ BATcount(b),
+ BATcount(b) * b->T->width,
+ b->T->varsized ? b->T->vheap->free : 0
+ );
+ mnstr_write(cntxt->fdout, /* tail */
+ b->T->heap.base + (b->U->first * b->T->width),
+ BATcount(b) * b->T->width, 1);
+ if (b->T->varsized)
+ mnstr_write(cntxt->fdout, /* theap */
+ b->T->vheap->base,
+ b->T->vheap->free, 1);
+ mnstr_flush(cntxt->fdout);
+
+ return(MAL_SUCCEED);
+}
+
+@h
+remote_export str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+@c
+typedef struct _binbat_v1 {
+ int Htype;
+ int Ttype;
+ oid seqbase;
+ size_t size;
+ size_t tailsize;
+ size_t theapsize;
+} binbat;
+
+static inline str
+RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in)
+{
+ binbat bb = { 0, 0, 0, 0, 0, 0 };
+ char *nme = NULL;
+ char *val = NULL;
+
+ BAT *b;
+
+ /* hdr is a JSON structure that looks like
+ *
{"version":1,"htype":0,"ttype":6,"seqbase":0,"tailsize":4,"theapsize":0}
+ * we take the binary data directly from the stream */
+
+ /* could skip whitespace, but we just don't allow that */
+ if (*hdr++ != '{')
+ throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON
header");
+ while (*hdr != '\0') {
+ switch (*hdr) {
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list