Changeset: 0f8bd066e580 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/0f8bd066e580
Modified Files:
monetdb5/modules/mal/remote.c
Branch: Mar2025
Log Message:
Use inspect to get list of atoms from remote server. Use this
list to remap type id's. This solves incompatible servers, in terms of which
types are supported.
diffs (155 lines):
diff --git a/monetdb5/modules/mal/remote.c b/monetdb5/modules/mal/remote.c
--- a/monetdb5/modules/mal/remote.c
+++ b/monetdb5/modules/mal/remote.c
@@ -88,12 +88,13 @@
#define RMTT_64_OIDS (1<<3)
#define RMTT_HGE (1<<4)
+#define MAXTYPE 64
typedef struct _connection {
MT_Lock lock; /* lock to avoid interference */
str name; /* the handle for this
connection */
Mapi mconn; /* the Mapi handle for
the connection */
unsigned char type; /* binary profile of the
connection target */
- bool int128; /* has int128 support */
+ int typemap[MAXTYPE]; /* map types from
remote back to local types */
size_t nextid; /* id counter */
struct _connection *next; /* the next connection in the list */
} *connection;
@@ -108,7 +109,6 @@ static MT_Lock mal_remoteLock = MT_LOCK_
static connection conns = NULL;
static unsigned char localtype = 0177;
-static bool int128 = false;
static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn,
const char *query);
@@ -284,36 +284,28 @@ RMTconnectScen(str *ret,
c->next = conns;
conns = c;
- msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
+ msg = RMTquery(&hdl, "remote.connect", m, "x := inspect.getAtomNames();
io.print(x);");
if (msg) {
MT_lock_unset(&mal_remoteLock);
return msg;
}
- if (hdl != NULL && mapi_fetch_row(hdl)) {
- char *val = mapi_fetch_field(hdl, 0);
- c->type = (unsigned char) atoi(val);
- mapi_close_handle(hdl);
- } else {
- c->type = 0;
+ int i = 0;
+ while (hdl != NULL && mapi_fetch_row(hdl)) {
+ if (i>=MAXTYPE) {
+ mapi_close_handle(hdl);
+ GDKfree(c);
+ mapi_destroy(m);
+ MT_lock_unset(&mal_remoteLock);
+ throw(MAL, "remote.connect", "too many types");
+ }
+ char *type = mapi_fetch_field(hdl, 1);
+ c->typemap[i++] = ATOMindex(type);
}
-
+ c->type = localtype;
+ mapi_close_handle(hdl);
#ifdef _DEBUG_MAPI_
mapi_trace(c->mconn, true);
#endif
- if (c->type != localtype && (c->type | RMTT_HGE) == localtype) {
- /* we support hge, and for remote, we don't know */
- msg = RMTquery(&hdl, "remote.connect", m, "x := 0:hge;");
- if (msg) {
- freeException(msg);
- c->int128 = false;
- } else {
- mapi_close_handle(hdl);
- c->int128 = true;
- c->type |= RMTT_HGE;
- }
- } else if (c->type == localtype) {
- c->int128 = int128;
- }
MT_lock_unset(&mal_remoteLock);
*ret = GDKstrdup(conn);
@@ -514,7 +506,6 @@ RMTprelude(void)
#endif
#ifdef HAVE_HGE
type |= RMTT_HGE;
- int128 = true;
#endif
localtype = (unsigned char) type;
@@ -584,7 +575,7 @@ typedef struct _binbat_v1 {
} binbat;
static str
-RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, bool
cint128)
+RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush, int
*typemap)
{
binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
char *nme = NULL;
@@ -595,7 +586,6 @@ RMTinternalcopyfrom(BAT **ret, char *hdr
BAT *b;
- (void) cint128;
/* hdr is a JSON structure that looks like
* {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
* we take the binary data directly from the stream */
@@ -662,6 +652,12 @@ RMTinternalcopyfrom(BAT **ret, char *hdr
throw(MAL, "remote.bincopyfrom",
"bad %s value: GDK
atom number %s doesn't exist",
nme, val);
+ if (lv >= 0 && typemap)
+ lv = typemap[lv];
+ if (lv < 0)
+ throw(MAL, "remote.bincopyfrom",
+ "bad %s value: GDK
atom number %s doesn't exist",
+ nme, val);
bb.Ttype = (int) lv;
} else if (strcmp(nme, "tseqbase") == 0) {
#if SIZEOF_OID < SIZEOF_LNG
@@ -699,12 +695,6 @@ RMTinternalcopyfrom(BAT **ret, char *hdr
}
hdr++;
}
-#ifdef HAVE_HGE
- if (int128 && !cint128 && bb.Ttype >= TYPE_hge)
- bb.Ttype++;
-#else
- (void) cint128;
-#endif
b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT,
bb.size > 0 ? (uint16_t) (bb.tailsize /
bb.size) : 0);
@@ -904,7 +894,7 @@ RMTget(Client cntxt, MalBlkPtr mb, MalSt
return tmp;
}
- if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->int128))
!= MAL_SUCCEED) {
+ if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true, c->typemap))
!= MAL_SUCCEED) {
MT_lock_unset(&c->lock);
return (tmp);
}
@@ -1405,7 +1395,7 @@ RMTexec(Client cntxt, MalBlkPtr mb, MalS
BAT *b = NULL;
if ((tmp = RMTreadbatheader(sin, buf)) !=
MAL_SUCCEED ||
- (tmp = RMTinternalcopyfrom(&b, buf,
sin, i == fields - 1, c->int128)) != MAL_SUCCEED) {
+ (tmp = RMTinternalcopyfrom(&b, buf,
sin, i == fields - 1, c->typemap)) != MAL_SUCCEED) {
break;
}
@@ -1619,7 +1609,7 @@ RMTbincopyfrom(Client cntxt, MalBlkPtr m
cntxt->fdin->buf[cntxt->fdin->len] = '\0';
err = RMTinternalcopyfrom(&b,
- &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s,
true, int128 /* library should be compatible */);
+ &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s,
true, NULL /* library should be compatible */);
/* skip the JSON line */
cntxt->fdin->pos = ++cntxt->fdin->len;
if (err !=MAL_SUCCEED)
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]