Changeset: 0ed931aa1c33 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0ed931aa1c33
Modified Files:
monetdb5/optimizer/opt_iot.c
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:
First full round
- use wrapper to isolate transaction stuff
diffs (97 lines):
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -53,7 +53,7 @@ OPTiotImplementation(Client cntxt, MalBl
int mvc[MAXBSKT];
int done[MAXBSKT]= {0};
int btop=0;
- int commit =0;
+ int noerror=0;
(void) pci;
@@ -115,7 +115,6 @@ OPTiotImplementation(Client cntxt, MalBl
p= pushStr(mb,p, schemas[j]);
p= pushStr(mb,p, tables[j]);
}
- p= newStmt(mb, sqlRef, transactionRef);
for (i = 1; i < limit; i++)
if (old[i]) {
p = old[i];
@@ -152,11 +151,9 @@ OPTiotImplementation(Client cntxt, MalBl
continue;
}
- if( getModuleId(p)== sqlRef && getFunctionId(p)
==commitRef)
- commit++;
- if (p->token == ENDsymbol && btop > 0 && commit == 0) {
- commit++;
- (void) newStmt(mb, sqlRef, commitRef);
+ if( getModuleId(p)== iotRef &&
getFunctionId(p)==errorRef)
+ noerror++;
+ if (p->token == ENDsymbol && btop > 0 && noerror==0) {
/* catch any exception left behind */
r = newAssignment(mb);
j = getArg(r, 0) = newVariable(mb,
GDKstrdup("SQLexception"), TYPE_str);
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -25,6 +25,7 @@ select * from stream_tmp;
-- reactivate all continuous queries
call iot.activate();
+select * from result;
select * from iot.baskets();
select * from iot.queries();
diff --git a/sql/backends/monet5/iot/basket.mal
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -41,7 +41,7 @@ pattern delete(mvc:int, sch:str, tbl:str
address BSKTdelete
comment "Append new tuples to a basket";
-pattern commit(mvc:int, sch:str, tbl:str):void
+pattern commit(mvc:any, sch:str, tbl:str):void
address BSKTcommit
comment "Make the basket available for the scheduler";
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -132,8 +132,11 @@ str
PNregisterInternal(Client cntxt, MalBlkPtr mb)
{
int i, init= pnettop == 0;
- InstrPtr sig;
+ InstrPtr sig,q;
str msg = MAL_SUCCEED;
+ MalBlkPtr nmb;
+ Symbol s;
+ char buf[IDLENGTH];
_DEBUG_PETRINET_ mnstr_printf(PNout, "#registerInternal status %d\n",
init);
if (pnettop == MAXPN)
@@ -146,8 +149,20 @@ PNregisterInternal(Client cntxt, MalBlkP
pnet[pnettop].modname = GDKstrdup(getModuleId(sig));
pnet[pnettop].fcnname = GDKstrdup(getFunctionId(sig));
- pnet[pnettop].mb = mb;
- pnet[pnettop].stk = prepareMALstack(mb, mb->vsize);
+ snprintf(buf,IDLENGTH,"petri_%d",pnettop);
+ s = newFunction("iot", buf, FUNCTIONsymbol);
+ nmb = s->def;
+ setArgType(nmb, nmb->stmt[0],0, TYPE_void);
+ (void) newStmt(nmb, sqlRef, transactionRef);
+ (void) newStmt(nmb,pnet[pnettop].modname, pnet[pnettop].fcnname);
+ q= newStmt(nmb, sqlRef, commitRef);
+ setArgType(nmb,q, 0, TYPE_void);
+ pushEndInstruction(nmb);
+ chkProgram(cntxt->fdout, cntxt->nspace, nmb);
+ _DEBUG_PETRINET_ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL);
+
+ pnet[pnettop].mb = nmb;
+ pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
pnet[pnettop].status = PNPAUSED;
pnet[pnettop].cycles = 0;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list