Changeset: e3bf585cd53a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e3bf585cd53a
Modified Files:
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:
Stay closer to the structures of the SQL catalog
diffs (truncated from 420 to 300 lines):
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -54,7 +54,7 @@ static int BSKTnewEntry(void)
baskets = (BasketRec *) GDKrealloc(baskets, bsktLimit *
sizeof(BasketRec));
}
for (i = 1; i < bsktLimit; i++)
- if (baskets[i].table == NULL)
+ if (baskets[i].table_name == NULL)
break;
bsktTop++;
return i;
@@ -65,19 +65,11 @@ static int BSKTnewEntry(void)
static void
BSKTclean(int idx)
{
- int i;
- GDKfree(baskets[idx].schema);
- GDKfree(baskets[idx].table);
- baskets[idx].schema = NULL;
- baskets[idx].table = NULL;
- if (baskets[idx].cols) {
- for (i = 0; i < baskets[idx].count; i++)
- GDKfree(baskets[idx].cols[i]);
- GDKfree(baskets[idx].cols);
- baskets[idx].cols = NULL;
- }
- GDKfree(baskets[idx].bats);
- baskets[idx].bats = NULL;
+ GDKfree(baskets[idx].schema_name);
+ GDKfree(baskets[idx].table_name);
+ baskets[idx].schema_name = NULL;
+ baskets[idx].table_name = NULL;
+
BBPreclaim(baskets[idx].errors);
baskets[idx].errors = NULL;
baskets[idx].count = 0;
@@ -93,8 +85,8 @@ BSKTlocate(str sch, str tbl)
if( sch == 0 || tbl == 0)
return 0;
for (i = 1; i < bsktTop; i++)
- if (baskets[i].schema && strcmp(sch, baskets[i].schema) == 0 &&
- baskets[i].table && strcmp(tbl, baskets[i].table) == 0)
+ if (baskets[i].schema_name && strcmp(sch,
baskets[i].schema_name) == 0 &&
+ baskets[i].table_name && strcmp(tbl,
baskets[i].table_name) == 0)
return i;
return 0;
}
@@ -103,57 +95,35 @@ BSKTlocate(str sch, str tbl)
static str
BSKTnewbasket(sql_schema *s, sql_table *t)
{
- int idx, i;
+ int idx;
node *o;
- BAT *b= NULL;
- sql_column *c;
- str msg= MAL_SUCCEED;
// Don't introduce the same basket twice
if( BSKTlocate(s->base.name, t->base.name) > 0)
- return msg;
+ return MAL_SUCCEED;
MT_lock_set(&iotLock);
idx = BSKTnewEntry();
MT_lock_init(&baskets[idx].lock,"newbasket");
- baskets[idx].schema = GDKstrdup(s->base.name);
- baskets[idx].table = GDKstrdup(t->base.name);
+ baskets[idx].schema_name = GDKstrdup(s->base.name);
+ baskets[idx].table_name = GDKstrdup(t->base.name);
baskets[idx].seen = * timestamp_nil;
baskets[idx].count = 0;
for (o = t->columns.set->h; o; o = o->next)
baskets[idx].count++;
- baskets[idx].cols = GDKzalloc((baskets[idx].count + 1) * sizeof(str));
- baskets[idx].bats = GDKzalloc((baskets[idx].count + 1) * sizeof(BAT *));
baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
- if (baskets[idx].table == NULL ||
- baskets[idx].cols == NULL ||
- baskets[idx].bats == NULL ||
+ if (baskets[idx].table_name == NULL ||
baskets[idx].errors == NULL) {
BSKTclean(idx);
MT_lock_unset(&iotLock);
throw(MAL,"baskets.register",MAL_MALLOC_FAIL);
}
- i = 0;
- for (o = t->columns.set->h; o; o = o->next) {
- c = o->data;
- b= BATnew(TYPE_void, c->type.type->localtype, 0, TRANSIENT);
- if (b == NULL) {
- BSKTclean(idx);
- MT_lock_unset(&iotLock);
- throw(MAL,"baskets.register","Can not locate stream
column '%s.%s.%s'",s->base.name, t->base.name, c->base.name);
- }
- baskets[idx].bats[i] = b->batCacheid;
- if ((baskets[idx].cols[i++] = GDKstrdup(c->base.name)) == NULL)
{
- BSKTclean(idx);
- MT_lock_unset(&iotLock);
- throw(MAL,"baskets.register",MAL_MALLOC_FAIL);
- }
- BBPkeepref(b->batCacheid);
- }
+ baskets[idx].schema = s;
+ baskets[idx].table = t;
MT_lock_unset(&iotLock);
- return msg;
+ return MAL_SUCCEED;
}
// MAL/SQL interface for registration of a single table
@@ -201,10 +171,14 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
str sch = *getArgReference_str(stk,pci,1);
str tbl = *getArgReference_str(stk,pci,2);
str col = *getArgReference_str(stk,pci,3);
- int idx,i;
+ int idx;
+ mvc *m = NULL;
+ sql_schema *s = NULL;
+ sql_table *t = NULL;
+ sql_column *c = NULL;
BAT *b;
+ str msg;
- (void) cntxt;
(void) mb;
*ret = 0;
@@ -212,13 +186,21 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
if (idx <= 0)
throw(SQL,"iot.bind","Stream table '%s.%s' not
registered",sch,tbl);
- for(i=0; i < baskets[idx].count; i++)
- if( strcmp(baskets[idx].cols[i], col)== 0 ){
- b= BATdescriptor(baskets[idx].bats[i]);
- if( b)
- BBPkeepref(*ret = b->batCacheid);
- return MAL_SUCCEED;
- }
+ msg= getSQLContext(cntxt,NULL, &m, NULL);
+ if( msg != MAL_SUCCEED)
+ return msg;
+ s= mvc_bind_schema(m, sch);
+ if ( s)
+ t= mvc_bind_table(m, s, tbl);
+ if ( t)
+ c= mvc_bind_column(m, t, col);
+
+ if( c){
+ b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL);
+ if( b)
+ BBPkeepref(*ret = b->batCacheid);
+ return MAL_SUCCEED;
+ }
throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not
found",sch,tbl,col);
}
@@ -283,7 +265,7 @@ BSKTreset(void *ret)
int i;
(void) ret;
for (i = 1; i < bsktLimit; i++)
- if (baskets[i].table)
+ if (baskets[i].table_name)
BSKTclean(i);
return MAL_SUCCEED;
}
@@ -295,8 +277,9 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
str sch = *getArgReference_str(stk, pci, 1);
str tbl = *getArgReference_str(stk, pci, 2);
str dir = *getArgReference_str(stk, pci, 3);
- int bskt,i;
+ int bskt;
char buf[BUFSIZ];
+ node *n;
bskt = BSKTlocate(sch,tbl);
if (bskt == 0)
@@ -307,14 +290,13 @@ BSKTpush(Client cntxt, MalBlkPtr mb, Mal
throw(SQL, "iot.push", "Could not access the basket directory
%s. error %d",dir,errno);
}
- for(i=0; i < baskets[bskt].count ; i++){
- snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP,
baskets[bskt].cols[i]);
+ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+ sql_column *c = n->data;
+ snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name);
_DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
}
(void) cntxt;
(void) mb;
- (void) stk;
- (void) pci;
return MAL_SUCCEED;
}
str
@@ -323,20 +305,30 @@ BSKTdump(void *ret)
int bskt;
BUN cnt;
BAT *b;
+ node *n;
+ sql_column *c;
+ mvc *m = NULL;
+ str msg = MAL_SUCCEED;
mnstr_printf(GDKout, "#baskets table\n");
for (bskt = 1; bskt < bsktLimit; bskt++)
- if (baskets[bskt].table) {
+ if (baskets[bskt].table_name) {
+ msg = getSQLContext(mal_clients, 0, &m, NULL);
+ if ( msg != MAL_SUCCEED)
+ break;
cnt = 0;
- if( baskets[bskt].bats[0]){
- b = BBPquickdesc(baskets[bskt].bats[0], TRUE);
- if( b)
- cnt = BATcount(b);
+ n = baskets[bskt].table->columns.set->h;
+ c = n->data;
+ b = store_funcs.bind_col(m->session->tr,c,RD_UPD_VAL);
+ if( b){
+ cnt = BATcount(b);
+ BBPunfix(b->batCacheid);
}
+
mnstr_printf(GDKout, "#baskets[%2d] %s.%s columns %d
threshold %d window=[%d,%d] time window=[" LLFMT "," LLFMT "] beat " LLFMT "
milliseconds" BUNFMT"\n",
bskt,
- baskets[bskt].schema,
- baskets[bskt].table,
+ baskets[bskt].schema_name,
+ baskets[bskt].table_name,
baskets[bskt].count,
baskets[bskt].threshold,
baskets[bskt].winsize,
@@ -348,7 +340,7 @@ BSKTdump(void *ret)
}
(void) ret;
- return MAL_SUCCEED;
+ return msg;
}
str
@@ -364,6 +356,10 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
InstrPtr
BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl)
{
+ (void) mb;
+ (void) sch;
+ (void) tbl;
+/*
int i, j, bskt;
InstrPtr p;
BAT *b;
@@ -383,6 +379,8 @@ BSKTupdateInstruction(MalBlkPtr mb, str
p = pushArgument(mb, p, j);
}
return p;
+*/
+ return NULL;
}
/* provide a tabular view for inspection */
@@ -437,9 +435,9 @@ BSKTtable(bat *schemaId, bat *nameId, ba
BATseqbase(timestride, 0);
for (i = 1; i < bsktTop; i++)
- if (baskets[i].table) {
- BUNappend(schema, baskets[i].schema, FALSE);
- BUNappend(name, baskets[i].table, FALSE);
+ if (baskets[i].table_name) {
+ BUNappend(schema, baskets[i].schema_name, FALSE);
+ BUNappend(name, baskets[i].table_name, FALSE);
BUNappend(threshold, &baskets[i].threshold, FALSE);
BUNappend(winsize, &baskets[i].winsize, FALSE);
BUNappend(winstride, &baskets[i].winstride, FALSE);
@@ -506,7 +504,7 @@ BSKTtableerrors(bat *nameId, bat *errorI
BATloop(baskets[i].errors, p, q)
{
str err = BUNtail(bi, p);
- BUNappend(name, &baskets[i].table, FALSE);
+ BUNappend(name, &baskets[i].table_name, FALSE);
BUNappend(error, err, FALSE);
}
}
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -41,15 +41,17 @@
typedef struct{
MT_Lock lock;
- str schema; /* schema for the basket */
- str table; /* table that represents the basket */
+ str schema_name; /* schema for the basket */
+ str table_name; /* table that represents the basket */
+ sql_schema *schema;
+ sql_table *table;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list