Changeset: 5d6709b593e5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5d6709b593e5
Modified Files:
        sql/backends/monet5/datacell/50_datacell.sql
        sql/backends/monet5/datacell/Tests/scenario00.sql
        sql/backends/monet5/datacell/basket.c
        sql/backends/monet5/datacell/basket.h
        sql/backends/monet5/datacell/datacell.c
        sql/backends/monet5/datacell/datacell.h
        sql/backends/monet5/datacell/datacell.mal
        sql/backends/monet5/datacell/emitter.c
        sql/backends/monet5/datacell/emitter.h
        sql/backends/monet5/datacell/emitter.mal
        sql/backends/monet5/datacell/petrinet.c
        sql/backends/monet5/datacell/receptor.c
        sql/backends/monet5/datacell/receptor.h
        sql/backends/monet5/datacell/receptor.mal
Branch: default
Log Message:

Simplifying code base
The interface has been simplified. The receptors, emitters, basket
and queries status can be inspected through a materialized view.


diffs (truncated from 1979 to 300 lines):

diff --git a/sql/backends/monet5/datacell/50_datacell.sql 
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -26,27 +26,21 @@ create procedure datacell.receptor(tbl s
 create procedure datacell.emitter(tbl string, host string, portid integer)
     external name datacell.emitter;
 
-create procedure datacell.mode(tbl string, mode string)
-       external name datacell.mode;
-
-create procedure datacell.protocol(tbl string, protocol string)
-       external name datacell.protocol;
-
 create procedure datacell.pause (tbl string)
     external name datacell.pause;
 
 create procedure datacell.resume (tbl string)
     external name datacell.resume;
 
+create procedure datacell.stop (tbl string)
+    external name datacell.stop;
+
 create procedure datacell.query(proc string, def string)
        external name datacell.query;
 
 create procedure datacell.query(proc string)
        external name datacell.query;
 
-create procedure datacell.remove (obj string)
-    external name datacell.remove;
-
 -- scheduler activation
 create procedure datacell.prelude()
        external name datacell.prelude;
@@ -84,12 +78,22 @@ returns boolean
 -- Inspection tables
 
 create function datacell.baskets()
-returns table( nme string, kind string, host string, portid int, status 
string, threshold int, winsize int, winstride int,  timeslice int, timestride 
int, beat int,
-       seen timestamp, cycles int, events int)
+returns table( nme string, threshold int, winsize int, winstride int,  
timeslice int, timestride int, beat int,
+       seen timestamp, events int)
 external name datacell.baskets;
 
+create function datacell.receptors()
+returns table( nme string, host string, port int, protocol string, mode 
string, status string, 
+       lastseen timestamp, cycles int, received int, pending int)
+external name datacell.receptors;
+
+create function datacell.emitters()
+returns table( nme string, host string, port int, protocol string, mode 
string, status string, 
+       lastsent timestamp, cycles int, sent int, pending int)
+external name datacell.emitters;
+
 create function datacell.queries()
-returns table( nme string, status string, seen timestamp, cycles int, events 
int, time bigint, error string, def string)
+returns table( nme string, status string, lastrun timestamp, cycles int, 
events int, time bigint, error string, def string)
 external name datacell.queries;
 
 create function datacell.errors()
diff --git a/sql/backends/monet5/datacell/Tests/scenario00.sql 
b/sql/backends/monet5/datacell/Tests/scenario00.sql
--- a/sql/backends/monet5/datacell/Tests/scenario00.sql
+++ b/sql/backends/monet5/datacell/Tests/scenario00.sql
@@ -20,9 +20,10 @@ call datacell.query('datacell.pass', 'in
 
 call datacell.resume();
 call datacell.dump();
+select * from datacell.receptors(); select * from datacell.emitters(); select 
* from datacell.queries(); select * from datacell.baskets();
 
 -- externally, activate the sensor 
---sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1 
--trace
+-- sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1 
--trace
 -- externally, activate the actuator server to listen
 -- nc -l -u 50600 
 
diff --git a/sql/backends/monet5/datacell/basket.c 
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -33,7 +33,7 @@
 #endif
 
 str schema_default = "datacell";
-str statusname[6] = { "<unknown>", "paused", "listen", "stopped", "dropped", 
"error" };
+str statusname[6] = { "<unknown>", "init", "paused", "running", "stop", 
"error" };
 str modename[3] = { "<unknown>", "active", "passive" };
 str protocolname[4] = { "<unknown>", "TCP", "UDP", "CSV" };
 
@@ -141,6 +141,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
 
        snprintf(buf, BUFSIZ, "%s.%s", s->base.name, t->base.name);
        baskets[idx].name = GDKstrdup(buf);
+       baskets[idx].seen = * timestamp_nil;
 
        baskets[idx].colcount = 0;
        for (o = t->columns.set->h; o; o = o->next)
@@ -148,7 +149,6 @@ BSKTnewbasket(sql_schema *s, sql_table *
        baskets[idx].cols = GDKzalloc((baskets[idx].colcount + 1) * 
sizeof(str));
        baskets[idx].primary = GDKzalloc((baskets[idx].colcount + 1) * 
sizeof(BAT *));
        baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY);
-       (void) MTIMEcurrent_timestamp(&baskets[idx].seen);
 
        i = 0;
        for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) {
@@ -215,6 +215,7 @@ str BSKTlock(int *ret, str *tbl, int *de
 {
        int bskt;
 
+       *ret = 0;
        bskt = BSKTlocate(*tbl);
        if (bskt == 0)
                throw(MAL, "basket.lock", "Could not find the basket");
@@ -227,6 +228,7 @@ str BSKTlock(int *ret, str *tbl, int *de
 #endif
        (void) delay;  /* control spinlock */
        (void) ret;
+       *ret = 1;
        return MAL_SUCCEED;
 }
 
@@ -244,8 +246,8 @@ str BSKTunlock(int *ret, str *tbl)
        bskt = BSKTlocate(*tbl);
        if (bskt == 0)
                throw(MAL, "basket.lock", "Could not find the basket");
+       *ret = 0;
        MT_lock_unset(&baskets[bskt].lock, "lock basket");
-       (void) ret;
        return MAL_SUCCEED;
 }
 
@@ -577,12 +579,11 @@ BSKTbeat(int *ret, str *tbl, int *sz)
 
 /* provide a tabular view for inspection */
 str
-BSKTtable(int *nameId, int *kindId, int *hostId, int *portId, int *statusId, 
int *thresholdId, int * winsizeId, int *winstrideId, int *timesliceId, int 
*timestrideId, int *beatId, int *seenId, int *cyclesId, int *eventsId)
+BSKTtable(int *nameId, int *thresholdId, int * winsizeId, int *winstrideId, 
int *timesliceId, int *timestrideId, int *beatId, int *seenId, int *eventsId)
 {
-       BAT *name = NULL, *seen = NULL, *events = NULL, *cycles = NULL;
+       BAT *name = NULL, *seen = NULL, *events = NULL;
        BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
        BAT *timeslice = NULL, *timestride = NULL;
-       BAT *kind, *status, *port, *host;
        int i;
 
        name = BATnew(TYPE_oid, TYPE_str, BATTINY);
@@ -609,10 +610,6 @@ BSKTtable(int *nameId, int *kindId, int 
        if (seen == 0)
                goto wrapup;
        BATseqbase(seen, 0);
-       cycles = BATnew(TYPE_oid, TYPE_int, BATTINY);
-       if (cycles == 0)
-               goto wrapup;
-       BATseqbase(cycles, 0);
        events = BATnew(TYPE_oid, TYPE_int, BATTINY);
        if (events == 0)
                goto wrapup;
@@ -626,47 +623,21 @@ BSKTtable(int *nameId, int *kindId, int 
        if (timestride == 0)
                goto wrapup;
        BATseqbase(timestride, 0);
-       kind = BATnew(TYPE_oid, TYPE_str, BATTINY);
-       if (kind == 0)
-               goto wrapup;
-       BATseqbase(kind, 0);
-       status = BATnew(TYPE_oid, TYPE_str, BATTINY);
-       if (status == 0)
-               goto wrapup;
-       BATseqbase(status, 0);
-       host = BATnew(TYPE_oid, TYPE_str, BATTINY);
-       if (host == 0)
-               goto wrapup;
-       BATseqbase(host, 0);
-       port = BATnew(TYPE_oid, TYPE_int, BATTINY);
-       if (port == 0)
-               goto wrapup;
-       BATseqbase(port, 0);
-
 
        for (i = 1; i < bsktTop; i++)
                if (baskets[i].name) {
                        BUNappend(name, baskets[i].name, FALSE);
-                       BUNappend(kind, baskets[i].kind, FALSE);
-                       BUNappend(host, baskets[i].host, FALSE);
-                       BUNappend(port, &baskets[i].port, FALSE);
-                       BUNappend(status, statusname[baskets[i].status], FALSE);
                        BUNappend(threshold, &baskets[i].threshold, FALSE);
                        BUNappend(winsize, &baskets[i].winsize, FALSE);
                        BUNappend(winstride, &baskets[i].winstride, FALSE);
                        BUNappend(beat, &baskets[i].beat, FALSE);
                        BUNappend(seen, &baskets[i].seen, FALSE);
-                       BUNappend(cycles, &baskets[i].cycles, FALSE);
                        BUNappend(events, &baskets[i].events, FALSE);
                        BUNappend(timeslice, &baskets[i].timeslice, FALSE);
                        BUNappend(timestride, &baskets[i].timestride, FALSE);
                }
 
        BBPkeepref(*nameId = name->batCacheid);
-       BBPkeepref(*kindId = kind->batCacheid);
-       BBPkeepref(*hostId = host->batCacheid);
-       BBPkeepref(*portId = port->batCacheid);
-       BBPkeepref(*statusId = status->batCacheid);
        BBPkeepref(*thresholdId = threshold->batCacheid);
        BBPkeepref(*winsizeId = winsize->batCacheid);
        BBPkeepref(*winstrideId = winstride->batCacheid);
@@ -674,7 +645,6 @@ BSKTtable(int *nameId, int *kindId, int 
        BBPkeepref(*timestrideId = timestride->batCacheid);
        BBPkeepref(*beatId = beat->batCacheid);
        BBPkeepref(*seenId = seen->batCacheid);
-       BBPkeepref(*cyclesId = cycles->batCacheid);
        BBPkeepref(*eventsId = events->batCacheid);
        return MAL_SUCCEED;
 wrapup:
@@ -694,18 +664,8 @@ wrapup:
                BBPreleaseref(beat->batCacheid);
        if (seen)
                BBPreleaseref(seen->batCacheid);
-       if (cycles)
-               BBPreleaseref(cycles->batCacheid);
        if (events)
                BBPreleaseref(events->batCacheid);
-       if (kind)
-               BBPreleaseref(kind->batCacheid);
-       if (status)
-               BBPreleaseref(status->batCacheid);
-       if (host)
-               BBPreleaseref(host->batCacheid);
-       if (port)
-               BBPreleaseref(port->batCacheid);
        throw(MAL, "datacell.baskets", MAL_MALLOC_FAIL);
 }
 
diff --git a/sql/backends/monet5/datacell/basket.h 
b/sql/backends/monet5/datacell/basket.h
--- a/sql/backends/monet5/datacell/basket.h
+++ b/sql/backends/monet5/datacell/basket.h
@@ -42,18 +42,15 @@
 typedef struct{
        MT_Lock lock;
        str name;       /* table that represents the basket */
-       str kind;       /* receptor/emitter basket */
-       int status;
        int threshold ; /* bound to determine scheduling eligibility */
        int winsize, winstride; /* sliding window operations */
        lng timeslice, timestride; /* temporal sliding window, determined by 
first temporal component */
        lng beat;       /* milliseconds delay */
        int colcount;
-       str host;
-       int port;       /* port claimed */
        str *cols;
        BAT **primary;
        /* statistics */
+       int status;
        timestamp seen;
        int events; /* total number of events grabbed */
        int cycles; 
@@ -62,12 +59,14 @@ typedef struct{
 } *BSKTbasket, BSKTbasketRec;
 
 
-#define BSKTPAUSE 1       /* not active now */
-#define BSKTLISTEN 2      /* awaiting */
-#define BSKTSTOP 3        /* stop reading the channel */
-#define BSKTDROP 4        /* stop reading the channel */
+#define BSKTINIT 1        
+#define BSKTPAUSE 2       /* not active now */
+#define BSKTRUNNING 3      
+#define BSKTSTOP 4               /* stop the thread */
 #define BSKTERROR 5       /* failed to establish the stream */
 
+#define PAUSEDEFAULT 1000
+
 datacell_export str statusname[6];
 
 #define BSKTACTIVE 1      /* ask for events */
@@ -96,7 +95,7 @@ datacell_export str BSKTthreshold(int *r
 datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
 datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
 datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide);
-datacell_export str BSKTtable(int *nameId, int *kindId, int *hostId, int 
*portId, int *statusId, int *thresholdId, int * winsizeId, int *winstrideId,int 
*timesliceId, int *timestrideId, int *beatId, int *seenId, int *cyclesId, int 
*eventsId);
+datacell_export str BSKTtable(int *nameId, int *thresholdId, int * winsizeId, 
int *winstrideId,int *timesliceId, int *timestrideId, int *beatId, int *seenId, 
int *eventsId);
 datacell_export str BSKTtableerrors(int *nmeId, int *errorId);
 
 datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
diff --git a/sql/backends/monet5/datacell/datacell.c 
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -136,7 +136,7 @@ DCreceptor(Client cntxt, MalBlkPtr mb, M
        int idx = BSKTlocate(*tbl);
        if (idx == 0)
                BSKTregister(cntxt, mb, stk, pci);
-       return DCreceptorNew(ret, tbl, host, port);
+       return RCreceptorStart(ret, tbl, host, port);
 }
 
 str
@@ -149,7 +149,7 @@ DCemitter(Client cntxt, MalBlkPtr mb, Ma
        int idx = BSKTlocate(*tbl);
        if (idx == 0)
                BSKTregister(cntxt, mb, stk, pci);
-       return DCemitterNew(ret, tbl, host, port);
+       return EMemitterStart(ret, tbl, host, port);
 }
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to