Changeset: 391b3bd2e7b5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=391b3bd2e7b5
Modified Files:
clients/Tests/exports.stable.out
clients/Tests/malcheck.stable.out
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.h
sql/backends/monet5/iot/iot.mal
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
sql/backends/monet5/sql_optimizer.c
sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.err
sql/test/BugTracker-2016/Tests/stream_table_crash.Bug-3952.stable.out
Branch: iot
Log Message:
Intermittent commit
diffs (truncated from 813 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -2043,6 +2043,7 @@ void dumpExceptionsToStream(stream *out,
void dumpHelpTable(stream *f, Module s, str text, int flag);
void dumpSearchTable(stream *f, str text);
str eqRef;
+str errorRef;
str escape_str(str *retval, str s);
str evalFile(Client c, str fname, int listing);
str evalRef;
diff --git a/clients/Tests/malcheck.stable.out
b/clients/Tests/malcheck.stable.out
--- a/clients/Tests/malcheck.stable.out
+++ b/clients/Tests/malcheck.stable.out
@@ -10,7 +10,6 @@ BSKTthreshold: missing for MAL command t
BSKTwindow: missing for MAL command window in
sql/backends/monet5/iot/basket.mal
BSKTtimewindow: missing for MAL command timewindow in
sql/backends/monet5/iot/basket.mal
BSKTbeat: missing for MAL command beat in sql/backends/monet5/iot/basket.mal
-PNtype: missing for MAL pattern types in sql/backends/monet5/iot/petrinet.mal
PNstep: missing for MAL pattern step in sql/backends/monet5/iot/petrinet.mal
# 15:16:26 >
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -24,42 +24,33 @@ create procedure iot.query(qry string)
create procedure iot.query("schema" string, name string)
external name iot.query;
--- pause the processing of a continuous query
-create procedure iot.pause ("schema" string, name string)
- external name iot.pause;
+create procedure iot.activate("schema" string, name string)
+ external name iot.activate;
-create procedure iot.pause ()
- external name iot.pause;
+create procedure iot.activate()
+ external name iot.activate;
--- resume the processing of a continuous query
-create procedure iot.resume ("schema" string, name string)
- external name iot.resume;
+create procedure iot.deactivate("schema" string, name string)
+ external name iot.deactivate;
-create procedure iot.resume ()
- external name iot.resume;
+create procedure iot.deactivate()
+ external name iot.deactivate;
-- resume with limited the number of scheduler before next pause
create procedure iot.cycles(n integer)
external name iot.cycles;
--- stop and remove a continuous query
-create procedure iot.stop ("schema" string, name string)
- external name iot.stop;
-
-create procedure iot.stop ()
- external name iot.stop;
-
-- deliver a new basket with tuples
create procedure iot.push("schema" string, "table" string, dirpath string)
external name iot.push;
-- Inspection tables
create function iot.baskets()
-returns table( "schema" string, "table" string, threshold int, winsize int,
winstride int, timeslice int, timestride int, beat int, seen timestamp, events
int)
+returns table( "schema" string, "table" string, "status" string, threshold
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen
timestamp, events int)
external name iot.baskets;
create function iot.queries()
- returns table( "schema" string, "function" string, status string, lastrun
timestamp, cycles int, events int, time bigint, error string)
+ returns table( "schema" string, "function" string, "status" string, lastrun
timestamp, cycles int, events int, time bigint, error string)
external name iot.queries;
create function iot.inputplaces()
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -12,9 +12,19 @@ end;
call iot.query('iot','cq00');
call iot.query('insert into iot.result select min(t), count(*), avg(val) from
iot.stream_tmp;');
-insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34);
-
select * from iot.baskets();
select * from iot.queries();
select * from iot.inputplaces();
select * from iot.outputplaces();
+
+-- stop all continuous queries
+call iot.deactivate();
+
+insert into stream_tmp values('2005-09-23 12:34:26.736',1,12.34);
+select * from stream_tmp;
+
+-- reactivate all continuous queries
+call iot.activate();
+
+select * from iot.baskets();
+select * from iot.queries();
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -36,7 +36,7 @@
//#define _DEBUG_BASKET_ if(0)
#define _DEBUG_BASKET_
-str statusname[6] = { "<unknown>", "init", "paused", "running", "stop",
"error" };
+str statusname[3] = { "<unknown>", "running", "paused" };
BasketRec *baskets; /* the global iot catalog */
static int bsktTop = 0, bsktLimit = 0;
@@ -178,6 +178,65 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
return msg;
}
+str
+BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch, tbl;
+ int idx = 0;
+
+ (void) cntxt;
+ (void) mb;
+
+ if( pci->argc > pci->retc){
+ sch = *getArgReference_str(stk, pci, 1);
+ tbl = *getArgReference_str(stk, pci, 2);
+
+ /* check for registration */
+ idx = BSKTlocate(sch, tbl);
+ if( idx == 0)
+ throw(SQL,"basket.activate","Stream table %s.%s not
accessible\n",sch,tbl);
+ MT_lock_set(&baskets[idx].lock);
+ baskets[idx].status = BSKTRUNNING;
+ MT_lock_unset(&baskets[idx].lock);
+ } else {
+ for( idx =1; idx <bsktTop; idx++){
+ MT_lock_set(&baskets[idx].lock);
+ baskets[idx].status = BSKTRUNNING;
+ MT_lock_unset(&baskets[idx].lock);
+ }
+ }
+ return MAL_SUCCEED;
+}
+
+str
+BSKTdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch, tbl;
+ int idx = 0;
+
+ (void) cntxt;
+ (void) mb;
+ if( pci->argc > pci->retc){
+ sch = *getArgReference_str(stk, pci, 1);
+ tbl = *getArgReference_str(stk, pci, 2);
+
+ /* check for registration */
+ idx = BSKTlocate(sch, tbl);
+ if( idx == 0)
+ throw(SQL,"basket.activate","Stream table %s.%s not
accessible\n",sch,tbl);
+ MT_lock_set(&baskets[idx].lock);
+ baskets[idx].status = BSKTPAUSED;
+ MT_lock_unset(&baskets[idx].lock);
+ } else {
+ for( idx =1; idx <bsktTop; idx++){
+ MT_lock_set(&baskets[idx].lock);
+ baskets[idx].status = BSKTPAUSED;
+ MT_lock_unset(&baskets[idx].lock);
+ }
+ }
+ return MAL_SUCCEED;
+}
+
static BAT *
BSKTbindColumn(Client cntxt, str sch, str tbl, str col)
{
@@ -435,12 +494,32 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
return MAL_SUCCEED;
}
-InstrPtr
-BSKTupdateInstruction(MalBlkPtr mb, str sch, str tbl)
+str
+BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
+ str sname = *getArgReference_str(stk, pci, 2);
+ str tname = *getArgReference_str(stk, pci, 3);
+ int idx;
+ (void) cntxt;
(void) mb;
- (void) sch;
- (void) tbl;
+
+ idx = BSKTlocate(sname,tname);
+ if( idx == 0)
+ throw(SQL,"basket.commit","Stream column %s.%s not
accessible\n",sname,tname);
+
+ MT_lock_set(&baskets[idx].lock);
+ baskets[idx].count++;
+ MT_lock_unset(&baskets[idx].lock);
+ return MAL_SUCCEED;
+}
+
+str
+BSKTupdate (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
return NULL;
}
@@ -450,15 +529,16 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
{
bat *schemaId = getArgReference_bat(stk,pci,0);
bat *nameId = getArgReference_bat(stk,pci,1);
- bat *thresholdId = getArgReference_bat(stk,pci,1);
- bat *winsizeId = getArgReference_bat(stk,pci,2);
- bat *winstrideId = getArgReference_bat(stk,pci,3);
- bat *timesliceId = getArgReference_bat(stk,pci,4);
- bat *timestrideId = getArgReference_bat(stk,pci,5);
- bat *beatId = getArgReference_bat(stk,pci,6);
- bat *seenId = getArgReference_bat(stk,pci,7);
- bat *eventsId = getArgReference_bat(stk,pci,8);
- BAT *schema = NULL, *name = NULL, *seen = NULL, *events = NULL;
+ bat *statusId = getArgReference_bat(stk,pci,2);
+ bat *thresholdId = getArgReference_bat(stk,pci,3);
+ bat *winsizeId = getArgReference_bat(stk,pci,4);
+ bat *winstrideId = getArgReference_bat(stk,pci,5);
+ bat *timesliceId = getArgReference_bat(stk,pci,6);
+ bat *timestrideId = getArgReference_bat(stk,pci,7);
+ bat *beatId = getArgReference_bat(stk,pci,8);
+ bat *seenId = getArgReference_bat(stk,pci,9);
+ bat *eventsId = getArgReference_bat(stk,pci,10);
+ BAT *schema = NULL, *name = NULL, *status = NULL, *seen = NULL,
*events = NULL;
BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
BAT *timeslice = NULL, *timestride = NULL;
int i;
@@ -473,7 +553,11 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
name = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
if (name == 0)
goto wrapup;
- BATseqbase(name, 0);
+ BATseqbase(status, 0);
+ status = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+ if (status == 0)
+ goto wrapup;
+ BATseqbase(status, 0);
threshold = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
if (threshold == 0)
goto wrapup;
@@ -486,6 +570,14 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
if (winstride == 0)
goto wrapup;
BATseqbase(winstride, 0);
+ timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
+ if (timeslice == 0)
+ goto wrapup;
+ BATseqbase(timeslice, 0);
+ timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
+ if (timestride == 0)
+ goto wrapup;
+ BATseqbase(timestride, 0);
beat = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
if (beat == 0)
goto wrapup;
@@ -499,33 +591,27 @@ BSKTtable (Client cntxt, MalBlkPtr mb, M
goto wrapup;
BATseqbase(events, 0);
- timeslice = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
- if (timeslice == 0)
- goto wrapup;
- BATseqbase(timeslice, 0);
- timestride = BATnew(TYPE_void, TYPE_int, BATTINY, TRANSIENT);
- if (timestride == 0)
- goto wrapup;
- BATseqbase(timestride, 0);
for (i = 1; i < bsktTop; i++)
if (baskets[i].table_name) {
BUNappend(schema, baskets[i].schema_name, FALSE);
BUNappend(name, baskets[i].table_name, FALSE);
+ BUNappend(status, statusname[baskets[i].status], FALSE);
BUNappend(threshold, &baskets[i].threshold, FALSE);
BUNappend(winsize, &baskets[i].winsize, FALSE);
BUNappend(winstride, &baskets[i].winstride, FALSE);
+ BUNappend(timeslice, &baskets[i].timeslice, FALSE);
+ BUNappend(timestride, &baskets[i].timestride, FALSE);
BUNappend(beat, &baskets[i].beat, FALSE);
BUNappend(seen, &baskets[i].seen, FALSE);
bn = BSKTbindColumn(cntxt,baskets[i].schema_name,
baskets[i].table_name, baskets[i].cols[0]);
baskets[i].events = bn ? (int) BATcount( bn): 0;
BUNappend(events, &baskets[i].events, FALSE);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list