Changeset: 931c38cd87bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=931c38cd87bc
Modified Files:
sql/backends/monet5/iot/Tests/bug05.sql
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/Tests/iot02.sql
sql/backends/monet5/iot/Tests/iot06.stable.out
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
Branch: iot
Log Message:
Fixing bounds§
diffs (truncated from 621 to 300 lines):
diff --git a/sql/backends/monet5/iot/Tests/bug05.sql
b/sql/backends/monet5/iot/Tests/bug05.sql
--- a/sql/backends/monet5/iot/Tests/bug05.sql
+++ b/sql/backends/monet5/iot/Tests/bug05.sql
@@ -17,7 +17,7 @@ INSERT INTO testing VALUES (now(), 2, 2)
INSERT INTO testing VALUES (now(), 3, 3);
CALL iot.show('sys', 'cquery');
-CALL iot.stop();
+CALL iot.pause();
DROP PROCEDURE cquery;
DROP TABLE testout;
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -35,7 +35,7 @@ select * from result;
--select * from iot.baskets();
--select * from iot.queries();
select * from iot.errors();
-call iot.stop();
+call iot.pause();
drop procedure cq00;
drop table stmp;
drop table result;
diff --git a/sql/backends/monet5/iot/Tests/iot02.sql
b/sql/backends/monet5/iot/Tests/iot02.sql
--- a/sql/backends/monet5/iot/Tests/iot02.sql
+++ b/sql/backends/monet5/iot/Tests/iot02.sql
@@ -24,14 +24,15 @@ insert into stmp2 values('2005-09-23 12:
call iot.resume('iot','cq02');
-- wait for 5 seconds for handler
-call iot.wait(5000);
select 'RESULT';
+--call iot.cycles('iot','cq02',4);
+call iot.wait(2000);
select * from stmp2;
select * from result1;
select * from result2;
-call iot.stop();
+call iot.pause();
select * from iot.errors();
drop procedure cq02;
drop table stmp2;
diff --git a/sql/backends/monet5/iot/Tests/iot06.stable.out
b/sql/backends/monet5/iot/Tests/iot06.stable.out
--- a/sql/backends/monet5/iot/Tests/iot06.stable.out
+++ b/sql/backends/monet5/iot/Tests/iot06.stable.out
@@ -39,24 +39,24 @@ Ready.
% clob # type
% 63 # length
unsafe function iot.cq06():void;
- X_1 := sql.mvc();
- X_33 := basket.register(X_1,"iot","tmp06",0);
-barrier X_56 := language.dataflow();
- C_2:bat[:oid] := basket.tid(X_1,"iot","tmp06");
- X_5:bat[:timestamp] := basket.bind(X_33,"iot","tmp06","t");
- X_8 := aggr.min(X_5);
-exit X_56;
- X_9 := sql.append(X_33,"iot","result","t",X_8);
- X_11 := aggr.count(X_5);
- X_12 := calc.int(X_11);
- X_13 := sql.append(X_9,"iot","result","sensor",X_12);
- X_15:bat[:int] := basket.bind(X_13,"iot","tmp06","val");
- X_17:bat[:dbl] := batcalc.dbl(2,X_15);
- X_19:dbl := aggr.avg(X_17);
- X_20 := calc.int(X_19,8,2);
- X_22 := sql.append(X_13,"iot","result","val",X_20);
- X_34 := basket.tumble(X_22,"iot","tmp06");
- basket.commit(X_34,"iot","tmp06");
+ X_0 := sql.mvc();
+ X_32 := basket.register(X_0,"iot","tmp06",0);
+barrier X_60 := language.dataflow();
+ C_1:bat[:oid] := basket.tid(X_0,"iot","tmp06");
+ X_4:bat[:timestamp] := basket.bind(X_32,"iot","tmp06","t");
+ X_8 := aggr.min(X_4);
+exit X_60;
+ X_10 := sql.append(X_32,"iot","result","t",X_8);
+ X_12 := aggr.count(X_4);
+ X_13 := calc.int(X_12);
+ X_15 := sql.append(X_10,"iot","result","sensor",X_13);
+ X_17:bat[:int] := basket.bind(X_15,"iot","tmp06","val");
+ X_20:bat[:dbl] := batcalc.dbl(2,X_17);
+ X_24:dbl := aggr.avg(X_20);
+ X_25 := calc.int(X_24,8,2);
+ X_28 := sql.append(X_15,"iot","result","val",X_25);
+ X_36 := basket.tumble(X_28,"iot","tmp06");
+ basket.commit(X_36,"iot","tmp06");
catch SQLexception:str;
iot.error("user","cq06",SQLexception);
exit SQLexception:str;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -352,6 +352,8 @@ BSKTtid(Client cntxt, MalBlkPtr mb, MalS
if( bskt == 0)
throw(SQL,"basket.bind","Stream table column '%s.%s' not
found\n",sch,tbl);
b = baskets[bskt].bats[0];
+ if( b == 0)
+ throw(SQL,"basket.bind","Stream table reference column '%s.%s'
not accessible\n",sch,tbl);
tids = COLnew(0, TYPE_void, 0, TRANSIENT);
if (tids == NULL)
@@ -464,6 +466,7 @@ BSKTimportInternal(Client cntxt, int bsk
assert( b);
bcnt = BATcount(b);
+ if( fsize > 0)
switch(ATOMstorage(b->ttype)){
case TYPE_bit:
case TYPE_bte:
@@ -657,9 +660,9 @@ BSKTexport(Client cntxt, MalBlkPtr mb, M
}
/* remove tuples from a basket according to the sliding policy */
-#define ColumnShift(B,TPE, STRIDE) { \
+#define ColumnShift(B,TPE, CNT) { \
TPE *first= (TPE*) Tloc(B, 0);\
- TPE *n = first+STRIDE;\
+ TPE *n = first+CNT;\
TPE *last= (TPE*) Tloc(B, BUNlast(B));\
for( ; n < last; n++, first++)\
*first=*n;\
@@ -710,7 +713,9 @@ BSKTtumbleInternal(Client cntxt, str sch
}
if( stride == -1)
BATsetcount(b, 0);
- else BATsetcount(b, BATcount(b)-cnt);
+ else
+ if( BATcount(b) >= cnt)
+ BATsetcount(b, BATcount(b)-cnt);
if( BATcount(b) == 0){
baskets[bskt].status = BSKTWAIT;
}
@@ -974,8 +979,10 @@ BSKTreset(Client cntxt, MalBlkPtr mb, Ma
MT_lock_set(&baskets[idx].lock);
for( i=0; baskets[idx].cols[i]; i++){
b = baskets[idx].bats[i];
- if(b)
+ if(b){
BATsetcount(b,0);
+ BATsettrivprop(b);
+ }
}
baskets[idx].status = BSKTWAIT;
MT_lock_unset(&baskets[idx].lock);
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -63,7 +63,6 @@ typedef struct {
str fcnname;
MalBlkPtr mb; /* Query block */
MalStkPtr stk; /* might be handy */
- Client client; /* MAL client context for this query */
int status; /* query status waiting/running/paused */
int enabled; /* all baskets are available */
@@ -192,7 +191,9 @@ PNregisterInternal(Client cntxt, MalBlkP
Symbol s;
char buf[IDLENGTH];
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n",
init);
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#registerInternal status %d\n", init);
+#endif
if (pnettop == MAXPN)
GDKerror("petrinet.register:Too many transitions");
@@ -218,20 +219,13 @@ PNregisterInternal(Client cntxt, MalBlkP
setArgType(nmb,q, 0, TYPE_void);
pushEndInstruction(nmb);
chkProgram(cntxt->fdout, cntxt->nspace, nmb);
- _DEBUG_PETRINET_ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL);
+#ifdef DEBUG_PETRINET
+ printFunction(cntxt->fdout, nmb, 0, LIST_MAL_ALL);
+#endif
pnet[pnettop].mb = nmb;
pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
- if(pnet[pnettop].client == NULL) {
- pnet[pnettop].client = MCinitClient(0,0,0);
- if (pnet[pnettop].client == NULL)
- throw(MAL,"petrinet.register","Failed to create client
record for continous query\n");
- msg = SQLinitClient(pnet[pnettop].client);
- if(msg)
- return msg;
- }
-
pnet[pnettop].status = PNWAIT;
pnet[pnettop].limit = calls;
pnet[pnettop].seen = *timestamp_nil;
@@ -266,13 +260,17 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
throw(SQL,"iot.pause","Continuous query %s.%s not
found\n", modname, fcnname);
}
pnet[i].status = newstatus;
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s
%s\n", modname, fcnname, statusname[newstatus]);
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#scheduler status %s.%s %s\n", modname, fcnname,
statusname[newstatus]);
+#endif
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
}
for ( i = 0; i < pnettop; i++){
pnet[i].status = newstatus;
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s:
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#scheduler status %s.%s: %s\n",
pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+#endif
}
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
@@ -280,28 +278,39 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
str
PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler\n");
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#resume scheduler\n");
+#endif
return PNstatus(cntxt, mb, stk, pci, PNWAIT);
}
str
PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler\n");
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#pause scheduler\n");
+#endif
return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
}
str
PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
+#ifdef DEBUG_PETRINET
int old = PNcycle;
+#endif
int delay= *getArgReference_int(stk,pci,1);
lng clk = GDKms();
+ (void) cntxt;
(void) mb;
- _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#scheduler wait %d
ms\n",delay);
+#ifdef DEBUG_PETRINET
+ mnstr_printf(cntxt->fdout, "#scheduler wait %d ms\n",delay);
+#endif
delay = delay < PNDELAY? 2*PNDELAY:delay;
while( GDKms() < clk + delay )
MT_sleep_ms(PNDELAY);
- _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, "#wait finished after %d
cycles\n",PNcycle -old );
+#ifdef DEBUG_PETRINET
+ mnstr_printf(cntxt->fdout, "#wait finished after %d cycles\n",PNcycle
-old );
+#endif
return MAL_SUCCEED;
}
@@ -312,7 +321,6 @@ PNderegisterInternal(int i){
MT_lock_set(&iotLock);
GDKfree(pnet[i].modname);
GDKfree(pnet[i].fcnname);
- //MCcloseClient(pnet[i].client);
memset((void*) (pnet+i), 0, sizeof(PNnode));
for( ; i<pnettop-1; i++)
pnet[i] = pnet[i+1];
@@ -338,25 +346,30 @@ PNderegister(Client cntxt, MalBlkPtr mb,
throw(SQL,"iot.pause","Continuous query %s.%s not
found\n", modname, fcnname);
}
PNderegisterInternal(i);
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered
%s.%s\n", modname, fcnname);
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#scheduler deregistered %s.%s\n",
modname, fcnname);
+#endif
return MAL_SUCCEED;
}
for ( i = pnettop-1; i >= 0 ; i--)
PNderegisterInternal(i);
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n");
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#scheduler deregistered all\n");
+#endif
return MAL_SUCCEED;
}
/* safely stop the engine by stopping all CQ firt */
str
PNstop(void){
- int i, cnt,limit = 20;
- _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n");
+ int i, cnt,limit = 200;
+#ifdef DEBUG_PETRINET
+ mnstr_printf(GDKout, "#scheduler being stopped\n");
+#endif
+ MT_lock_set(&iotLock);
pnstatus = PNSTOP; // avoid starting new continuous queries
- for( i = 0; i < pnettop; i++)
- if( pnet[i].client )
- pnet[i].client->itrace ='x';
+ MT_lock_unset(&iotLock);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list