Changeset: 086a0278a6a8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=086a0278a6a8
Modified Files:
clients/Tests/malcheck.stable.out
monetdb5/optimizer/opt_iot.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.h
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:
Intermittent commit
- add delete code
- add transaction brackets
- improve optimizer
- reduce lock management
diffs (truncated from 767 to 300 lines):
diff --git a/clients/Tests/malcheck.stable.out
b/clients/Tests/malcheck.stable.out
--- a/clients/Tests/malcheck.stable.out
+++ b/clients/Tests/malcheck.stable.out
@@ -10,6 +10,7 @@ BSKTthreshold: missing for MAL command t
BSKTwindow: missing for MAL command window in
sql/backends/monet5/iot/basket.mal
BSKTtimewindow: missing for MAL command timewindow in
sql/backends/monet5/iot/basket.mal
BSKTbeat: missing for MAL command beat in sql/backends/monet5/iot/basket.mal
+IOTstop: missing for MAL pattern step in sql/backends/monet5/iot/iot.mal
PNstep: missing for MAL pattern step in sql/backends/monet5/iot/petrinet.mal
# 15:16:26 >
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -45,16 +45,17 @@
int
OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
- int mvc=0;
int i, j, k, fnd, limit, slimit;
InstrPtr r, p, *old;
int *alias;
str schemas[MAXBSKT];
str tables[MAXBSKT];
+ int mvc[MAXBSKT];
+ int done[MAXBSKT]= {0};
int btop=0;
+ int commit =0;
(void) pci;
- (void) mvc;
old = mb->stmt;
limit = mb->stop;
@@ -63,11 +64,31 @@ OPTiotImplementation(Client cntxt, MalBl
/* first analyse the query for streaming tables */
for (i = 1; i < limit && btop <MAXBSKT; i++){
p = old[i];
- if( getModuleId(p)== basketRef && getFunctionId(p)==
registerRef ){
+ if( getModuleId(p)== basketRef && (getFunctionId(p)==
registerRef || getFunctionId(p)== bindRef || getFunctionId(p)== clear_tableRef)
){
OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
- btop++;
+ mvc[btop] = getArg(p,0);
+ for( j =0; j< btop ; j++)
+ if( strcmp(schemas[j], schemas[j+1])==0 &&
strcmp(tables[j],tables[j+1]) ==0)
+ break;
+ mvc[j] = getArg(p,0);
+ done[j]= done[j]== 0 || getFunctionId(p)== registerRef;
+ if( j == btop)
+ btop++;
+ }
+ if( getModuleId(p)== basketRef && (getFunctionId(p) ==
appendRef || getFunctionId(p) == deleteRef )){
+ OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
+ schemas[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+ tables[btop]= getVarConstant(mb, getArg(p,3)).val.sval;
+ mvc[btop] = getArg(p,0);
+ for( j =0; j< btop ; j++)
+ if( strcmp(schemas[j], schemas[j+1])==0 &&
strcmp(tables[j],tables[j+1]) ==0)
+ break;
+
+ mvc[j] = getArg(p,0);
+ if( j == btop)
+ btop++;
}
}
if( btop == MAXBSKT || btop == 0)
@@ -87,10 +108,22 @@ OPTiotImplementation(Client cntxt, MalBl
return 0;
pushInstruction(mb, old[0]);
+ // register all baskets used
+ for( j=0; j<btop; j++)
+ if( done[j]==0) {
+ p= newStmt(mb,basketRef,registerRef);
+ p= pushStr(mb,p, schemas[j]);
+ p= pushStr(mb,p, tables[j]);
+ }
+ p= newStmt(mb, sqlRef, transactionRef);
for (i = 1; i < limit; i++)
if (old[i]) {
p = old[i];
+ if(getModuleId(p) == sqlRef && getFunctionId(p)==
transactionRef){
+ freeInstruction(p);
+ continue;
+ }
if (getModuleId(p) == sqlRef && getFunctionId(p) ==
tidRef ){
isstream(getVarConstant(mb,getArg(p,2)).val.sval,
getVarConstant(mb,getArg(p,3)).val.sval );
if( fnd){
@@ -106,11 +139,24 @@ OPTiotImplementation(Client cntxt, MalBl
}
if (getModuleId(p) == sqlRef && getFunctionId(p) ==
affectedRowsRef ){
+ for(j = 0; j < btop; j++){
+ r = newStmt(mb, basketRef, commitRef);
+ if (alias[mvc[j]] > 0)
+ r = pushArgument(mb,r,
alias[mvc[j]]);
+ else
+ r = pushArgument(mb,r, mvc[j]);
+ r = pushStr(mb,r, schemas[j]);
+ r = pushStr(mb,r, tables[j]);
+ }
freeInstruction(p);
continue;
}
- if (p->token == ENDsymbol && btop > 0) {
+ if( getModuleId(p)== sqlRef && getFunctionId(p)
==commitRef)
+ commit++;
+ if (p->token == ENDsymbol && btop > 0 && commit == 0) {
+ commit++;
+ (void) newStmt(mb, sqlRef, commitRef);
/* catch any exception left behind */
r = newAssignment(mb);
j = getArg(r, 0) = newVariable(mb,
GDKstrdup("SQLexception"), TYPE_str);
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -225,6 +225,7 @@ str unpinRef;
str updateRef;
str subselectRef;
str timestampRef;
+str transactionRef;
str thetasubselectRef;
str likesubselectRef;
str ilikesubselectRef;
@@ -433,6 +434,7 @@ void optimizerInit(void)
subsortRef = putName("subsort",7);
takeRef= putName("take",5);
timestampRef = putName("timestamp", 9);
+ transactionRef = putName("transaction", 11);
not_uniqueRef= putName("not_unique",10);
sampleRef= putName("sample",6);
subuniqueRef= putName("subunique",9);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -209,6 +209,7 @@ opt_export str subsumRef;
opt_export str subavgRef;
opt_export str subsortRef;
opt_export str timestampRef;
+opt_export str transactionRef;
opt_export str takeRef;
opt_export str not_uniqueRef;
opt_export str sampleRef;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -36,7 +36,7 @@
//#define _DEBUG_BASKET_ if(0)
#define _DEBUG_BASKET_
-str statusname[3] = { "<unknown>", "running", "paused" };
+str statusname[4] = { "<unknown>", "running", "paused", "locked" };
BasketRec *baskets; /* the global iot catalog */
static int bsktTop = 0, bsktLimit = 0;
@@ -73,7 +73,6 @@ BSKTclean(int idx)
BBPreclaim(baskets[idx].errors);
baskets[idx].errors = NULL;
baskets[idx].count = 0;
- MT_lock_destroy(&baskets[idx].lock);
}
// locate the basket in the catalog
@@ -101,21 +100,23 @@ BSKTnewbasket(sql_schema *s, sql_table *
// Don't introduce the same basket twice
if( BSKTlocate(s->base.name, t->base.name) > 0)
return MAL_SUCCEED;
- //MT_lock_set(&iotLock);
+ MT_lock_set(&iotLock);
idx = BSKTnewEntry();
- MT_lock_init(&baskets[idx].lock,"newbasket");
baskets[idx].schema_name = GDKstrdup(s->base.name);
baskets[idx].table_name = GDKstrdup(t->base.name);
baskets[idx].seen = * timestamp_nil;
+ baskets[idx].status = BSKTPAUSED;
baskets[idx].count = 0;
for (o = t->columns.set->h; o; o = o->next){
sql_column *col = o->data;
int tpe = col->type.type->localtype;
- if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) )
+ if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) ){
+ MT_lock_unset(&iotLock);
throw(MAL,"baskets.register","Unsupported type %d",tpe);
+ }
baskets[idx].count++;
}
// collect the column names
@@ -136,7 +137,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
baskets[idx].schema = s;
baskets[idx].table = t;
- //MT_lock_unset(&iotLock);
+ MT_lock_unset(&iotLock);
return MAL_SUCCEED;
}
@@ -194,16 +195,17 @@ BSKTactivate(Client cntxt, MalBlkPtr mb,
/* check for registration */
idx = BSKTlocate(sch, tbl);
if( idx == 0)
- throw(SQL,"basket.activate","Stream table %s.%s not
accessible\n",sch,tbl);
- MT_lock_set(&baskets[idx].lock);
- baskets[idx].status = BSKTRUNNING;
- MT_lock_unset(&baskets[idx].lock);
+ throw(SQL,"basket.activate","Stream table %s.%s not
accessible to activate\n",sch,tbl);
+ if( baskets[idx].status == BSKTPAUSED){
+ MT_lock_set(&iotLock);
+ baskets[idx].status = BSKTRUNNING;
+ MT_lock_unset(&iotLock);
+ }
} else {
- for( idx =1; idx <bsktTop; idx++){
- MT_lock_set(&baskets[idx].lock);
+ MT_lock_set(&iotLock);
+ for( idx =1; idx <bsktTop; idx++)
baskets[idx].status = BSKTRUNNING;
- MT_lock_unset(&baskets[idx].lock);
- }
+ MT_lock_unset(&iotLock);
}
return MAL_SUCCEED;
}
@@ -223,16 +225,17 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
/* check for registration */
idx = BSKTlocate(sch, tbl);
if( idx == 0)
- throw(SQL,"basket.activate","Stream table %s.%s not
accessible\n",sch,tbl);
- MT_lock_set(&baskets[idx].lock);
- baskets[idx].status = BSKTPAUSED;
- MT_lock_unset(&baskets[idx].lock);
+ throw(SQL,"basket.activate","Stream table %s.%s not
accessible to deactivate\n",sch,tbl);
+ if( baskets[idx].status == BSKTRUNNING){
+ MT_lock_set(&iotLock);
+ baskets[idx].status = BSKTPAUSED;
+ MT_lock_unset(&iotLock);
+ }
} else {
- for( idx =1; idx <bsktTop; idx++){
- MT_lock_set(&baskets[idx].lock);
+ MT_lock_set(&iotLock);
+ for( idx =1; idx <bsktTop; idx++)
baskets[idx].status = BSKTPAUSED;
- MT_lock_unset(&baskets[idx].lock);
- }
+ MT_lock_unset(&iotLock);
}
return MAL_SUCCEED;
}
@@ -279,45 +282,6 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
throw(SQL,"iot.bind","Stream table column '%s.%s.%s' not
found",sch,tbl,col);
}
-/*
- * The locks are designated towards the baskets.
- * If you can not grab the lock then we have to wait.
- */
-str BSKTlock(void *ret, str *sch, str *tbl, int *delay)
-{
- int bskt;
-
- bskt = BSKTlocate(*sch, *tbl);
- if (bskt <= 0)
- throw(SQL, "basket.lock", "Could not find the basket
%s.%s",*sch,*tbl);
- _DEBUG_BASKET_ mnstr_printf(BSKTout, "lock group %s.%s\n", *sch, *tbl);
- MT_lock_set(&baskets[bskt].lock);
- _DEBUG_BASKET_ mnstr_printf(BSKTout, "got group locked %s.%s\n", *sch,
*tbl);
- (void) delay; /* control spinlock */
- (void) ret;
- return MAL_SUCCEED;
-}
-
-
-str BSKTlock2(void *ret, str *sch, str *tbl)
-{
- int delay = 0;
- return BSKTlock(ret, sch, tbl, &delay);
-}
-
-str BSKTunlock(void *ret, str *sch,str *tbl)
-{
- int bskt;
-
- (void) ret;
- bskt = BSKTlocate(*sch,*tbl);
- if (bskt == 0)
- throw(SQL, "basket.lock", "Could not find the basket
%s.%s",*sch,*tbl);
- MT_lock_unset(&baskets[bskt].lock);
- return MAL_SUCCEED;
-}
-
-
str
BSKTdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list