Changeset: 712601163f25 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=712601163f25
Added Files:
        sql/backends/monet5/datacell/Tests/TestingRecipe
Modified Files:
        sql/backends/monet5/datacell/50_datacell.mal
        sql/backends/monet5/datacell/50_datacell.sql
        sql/backends/monet5/datacell/actuator.c
        sql/backends/monet5/datacell/basket.c
        sql/backends/monet5/datacell/basket.h
        sql/backends/monet5/datacell/basket.mal
        sql/backends/monet5/datacell/datacell.c
        sql/backends/monet5/datacell/datacell.h
        sql/backends/monet5/datacell/datacell.mal
        sql/backends/monet5/datacell/emitter.c
        sql/backends/monet5/datacell/emitter.h
        sql/backends/monet5/datacell/opt_datacell.c
        sql/backends/monet5/datacell/petrinet.c
        sql/backends/monet5/datacell/petrinet.h
        sql/backends/monet5/datacell/receptor.c
        sql/backends/monet5/datacell/receptor.h
        sql/backends/monet5/sql.mx
Branch: default
Log Message:

Update on datacell functionalities.
Code has been reshuffled and aligned in the basket.h files.
The default protocol has been set to receptor=TCP and
emitter=UDP
The table producing functions have been extended to ease
debugging of the datacell states.
More functional improvements coming.


diffs (truncated from 1217 to 300 lines):

diff --git a/sql/backends/monet5/datacell/50_datacell.mal 
b/sql/backends/monet5/datacell/50_datacell.mal
--- a/sql/backends/monet5/datacell/50_datacell.mal
+++ b/sql/backends/monet5/datacell/50_datacell.mal
@@ -22,3 +22,4 @@ include basket;
 include receptor;
 include emitter;
 
+datacell.prelude();
diff --git a/sql/backends/monet5/datacell/50_datacell.sql 
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -84,8 +84,8 @@ returns boolean
 -- Inspection tables
 
 create function datacell.baskets()
-returns table( nme string, threshold int, winsize int, winstride int,  
timeslice int, timestride int, beat int,
-       seen timestamp, grabs int, events int)
+returns table( nme string, kind string, host string, portid int, status 
string, threshold int, winsize int, winstride int,  timeslice int, timestride 
int, beat int,
+       seen timestamp, cycles int, events int)
 external name datacell.baskets;
 
 create function datacell.queries()
diff --git a/sql/backends/monet5/datacell/Tests/TestingRecipe 
b/sql/backends/monet5/datacell/Tests/TestingRecipe
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/datacell/Tests/TestingRecipe
@@ -0,0 +1,17 @@
+In most cases we have to test Datacell using manual
+interactions.
+
+The key debugging operations are datacell.baskets()
+and datacell.queries();
+Furthermore some
+The scenarios contain some hints to test the datacell.
+
+A short checklist on functionality tests:
+- test datacell.dump() on non-initialized datacell
+- test if the actuator refus to contact non-started datacell
+- test pause/resume of emitter using scenario00
+- test pause/resume of query using scenarion00
+- test pause/resume of receptor
+
+- test emitter in UDP protocol (easiest)
+- test emitter in TCP protocol, actuator start first!
diff --git a/sql/backends/monet5/datacell/actuator.c 
b/sql/backends/monet5/datacell/actuator.c
--- a/sql/backends/monet5/datacell/actuator.c
+++ b/sql/backends/monet5/datacell/actuator.c
@@ -77,7 +77,7 @@ ACnew(str nme)
 
 #define TCP 1
 #define UDP 2
-static int protocol = TCP;
+static int protocol = TCP; /* default protocol to use is TCP */
 
 #define ACTIVE 1
 #define PASSIVE 2
diff --git a/sql/backends/monet5/datacell/basket.c 
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -18,8 +18,7 @@
  */
 
 /*
- * @f basket
- * @- Event baskets
+ * Event baskets
  * Continuous query processing relies on event baskets
  * passed through a processing pipeline. The baskets
  * are derived from ordinary SQL tables where the delta
@@ -34,6 +33,9 @@
 #endif
 
 str schema_default = "datacell";
+str statusname[6] = { "<unknown>", "paused", "listen", "stopped", "dropped", 
"error" };
+str modename[3] = { "<unknown>", "active", "passive" };
+str protocolname[4] = { "<unknown>", "TCP", "UDP", "CSV" };
 
 BSKTbasketRec *baskets;   /* the datacell catalog */
 int bsktTop = 0, bsktLimit = 0;
@@ -115,6 +117,11 @@ BSKTlocate(str tbl)
        for (i = 1; i < bsktTop; i++)
                if (tbl && baskets[i].name && strcmp(tbl, baskets[i].name) == 0)
                        return i;
+       /* try prefixing it with datacell */
+       snprintf(buf,BUFSIZ,"datacell.%s",tbl);
+       for (i = 1; i < bsktTop; i++)
+               if (baskets[i].name && strcmp(buf, baskets[i].name) == 0)
+                       return i;
        return 0;
 }
 
@@ -358,6 +365,7 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
                        b = BATsetaccess(b, BAT_WRITE);
                        BATclear(b, FALSE);
                        BATins(b, bn, FALSE);
+                       cnt = (int) BATcount(bn);
                        BBPreleaseref(bn->batCacheid);
                }
                BBPreleaseref(bo->batCacheid);
@@ -396,13 +404,14 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
                        ret = (int *) getArgReference(stk, pci, i);
                        b = baskets[bskt].primary[i];
                        bn = BATcopy(b, b->htype, b->ttype, TRUE);
+                       cnt = (int) BATcount(bn);
                        BATclear(b, FALSE);
                        *ret = bn->batCacheid;
                        BBPkeepref(*ret);
                }
                MT_lock_unset(&baskets[bskt].lock, "unlock basket");
        }
-       baskets[bskt].grabs++;
+       baskets[bskt].cycles++;
        baskets[bskt].events += cnt;
        return MAL_SUCCEED;
 }
@@ -568,11 +577,12 @@ BSKTbeat(int *ret, str *tbl, int *sz)
 
 /* provide a tabular view for inspection */
 str
-BSKTtable(int *nameId, int *thresholdId, int * winsizeId, int *winstrideId, 
int *timesliceId, int *timestrideId, int *beatId, int *seenId, int *grabsId, 
int *eventsId)
+BSKTtable(int *nameId, int *kindId, int *hostId, int *portId, int *statusId, 
int *thresholdId, int * winsizeId, int *winstrideId, int *timesliceId, int 
*timestrideId, int *beatId, int *seenId, int *cyclesId, int *eventsId)
 {
-       BAT *name = NULL, *seen = NULL, *events = NULL, *grabs = NULL;
+       BAT *name = NULL, *seen = NULL, *events = NULL, *cycles = NULL;
        BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
        BAT *timeslice = NULL, *timestride = NULL;
+       BAT *kind, *status, *port, *host;
        int i;
 
        name = BATnew(TYPE_oid, TYPE_str, BATTINY);
@@ -599,10 +609,10 @@ BSKTtable(int *nameId, int *thresholdId,
        if (seen == 0)
                goto wrapup;
        BATseqbase(seen, 0);
-       grabs = BATnew(TYPE_oid, TYPE_int, BATTINY);
-       if (grabs == 0)
+       cycles = BATnew(TYPE_oid, TYPE_int, BATTINY);
+       if (cycles == 0)
                goto wrapup;
-       BATseqbase(grabs, 0);
+       BATseqbase(cycles, 0);
        events = BATnew(TYPE_oid, TYPE_int, BATTINY);
        if (events == 0)
                goto wrapup;
@@ -616,22 +626,47 @@ BSKTtable(int *nameId, int *thresholdId,
        if (timestride == 0)
                goto wrapup;
        BATseqbase(timestride, 0);
+       kind = BATnew(TYPE_oid, TYPE_str, BATTINY);
+       if (kind == 0)
+               goto wrapup;
+       BATseqbase(kind, 0);
+       status = BATnew(TYPE_oid, TYPE_str, BATTINY);
+       if (status == 0)
+               goto wrapup;
+       BATseqbase(status, 0);
+       host = BATnew(TYPE_oid, TYPE_str, BATTINY);
+       if (host == 0)
+               goto wrapup;
+       BATseqbase(host, 0);
+       port = BATnew(TYPE_oid, TYPE_int, BATTINY);
+       if (port == 0)
+               goto wrapup;
+       BATseqbase(port, 0);
+
 
        for (i = 1; i < bsktTop; i++)
                if (baskets[i].name) {
                        BUNappend(name, baskets[i].name, FALSE);
+                       BUNappend(kind, baskets[i].kind, FALSE);
+                       BUNappend(host, baskets[i].host, FALSE);
+                       BUNappend(port, &baskets[i].port, FALSE);
+                       BUNappend(status, statusname[baskets[i].status], FALSE);
                        BUNappend(threshold, &baskets[i].threshold, FALSE);
                        BUNappend(winsize, &baskets[i].winsize, FALSE);
                        BUNappend(winstride, &baskets[i].winstride, FALSE);
                        BUNappend(beat, &baskets[i].beat, FALSE);
                        BUNappend(seen, &baskets[i].seen, FALSE);
-                       BUNappend(grabs, &baskets[i].grabs, FALSE);
+                       BUNappend(cycles, &baskets[i].cycles, FALSE);
                        BUNappend(events, &baskets[i].events, FALSE);
                        BUNappend(timeslice, &baskets[i].timeslice, FALSE);
                        BUNappend(timestride, &baskets[i].timestride, FALSE);
                }
 
        BBPkeepref(*nameId = name->batCacheid);
+       BBPkeepref(*kindId = kind->batCacheid);
+       BBPkeepref(*hostId = host->batCacheid);
+       BBPkeepref(*portId = port->batCacheid);
+       BBPkeepref(*statusId = status->batCacheid);
        BBPkeepref(*thresholdId = threshold->batCacheid);
        BBPkeepref(*winsizeId = winsize->batCacheid);
        BBPkeepref(*winstrideId = winstride->batCacheid);
@@ -639,7 +674,7 @@ BSKTtable(int *nameId, int *thresholdId,
        BBPkeepref(*timestrideId = timestride->batCacheid);
        BBPkeepref(*beatId = beat->batCacheid);
        BBPkeepref(*seenId = seen->batCacheid);
-       BBPkeepref(*grabsId = grabs->batCacheid);
+       BBPkeepref(*cyclesId = cycles->batCacheid);
        BBPkeepref(*eventsId = events->batCacheid);
        return MAL_SUCCEED;
 wrapup:
@@ -659,10 +694,18 @@ wrapup:
                BBPreleaseref(beat->batCacheid);
        if (seen)
                BBPreleaseref(seen->batCacheid);
-       if (grabs)
-               BBPreleaseref(grabs->batCacheid);
+       if (cycles)
+               BBPreleaseref(cycles->batCacheid);
        if (events)
                BBPreleaseref(events->batCacheid);
+       if (kind)
+               BBPreleaseref(kind->batCacheid);
+       if (status)
+               BBPreleaseref(status->batCacheid);
+       if (host)
+               BBPreleaseref(host->batCacheid);
+       if (port)
+               BBPreleaseref(port->batCacheid);
        throw(MAL, "datacell.baskets", MAL_MALLOC_FAIL);
 }
 
diff --git a/sql/backends/monet5/datacell/basket.h 
b/sql/backends/monet5/datacell/basket.h
--- a/sql/backends/monet5/datacell/basket.h
+++ b/sql/backends/monet5/datacell/basket.h
@@ -42,23 +42,47 @@
 typedef struct{
        MT_Lock lock;
        str name;       /* table that represents the basket */
+       str kind;       /* receptor/emitter basket */
+       int status;
        int threshold ; /* bound to determine scheduling eligibility */
        int winsize, winstride; /* sliding window operations */
        lng timeslice, timestride; /* temporal sliding window, determined by 
first temporal component */
        lng beat;       /* milliseconds delay */
        int colcount;
+       str host;
        int port;       /* port claimed */
        str *cols;
        BAT **primary;
        /* statistics */
        timestamp seen;
        int events; /* total number of events grabbed */
-       int grabs; /* number of grabs */
+       int cycles; 
        /* collected errors */
        BAT *errors;
 } *BSKTbasket, BSKTbasketRec;
 
+
+#define BSKTPAUSE 1       /* not active now */
+#define BSKTLISTEN 2      /* awaiting */
+#define BSKTSTOP 3        /* stop reading the channel */
+#define BSKTDROP 4        /* stop reading the channel */
+#define BSKTERROR 5       /* failed to establish the stream */
+
+datacell_export str statusname[6];
+
+#define BSKTACTIVE 1      /* ask for events */
+#define BSKTPASSIVE 2     /* wait for events */
+
+datacell_export str modename[3];
+
+#define TCP 1
+#define UDP 2
+#define CSV 3
+
+datacell_export str protocolname[4];
+
 datacell_export str schema_default;
+
 datacell_export str BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str BSKTdrop(int *ret, str *tbl);
 datacell_export str BSKTreset(int *ret);
@@ -72,7 +96,7 @@ datacell_export str BSKTthreshold(int *r
 datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
 datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
 datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide);
-datacell_export str BSKTtable(int *nameId, int *thresholdId, int * winsizeId, 
int *winstrideId,int *timesliceId, int *timestrideId, int *beatId, int *seenId, 
int *grabsId, int *eventsId);
+datacell_export str BSKTtable(int *nameId, int *kindId, int *hostId, int 
*portId, int *statusId, int *thresholdId, int * winsizeId, int *winstrideId,int 
*timesliceId, int *timestrideId, int *beatId, int *seenId, int *cyclesId, int 
*eventsId);
 datacell_export str BSKTtableerrors(int *nmeId, int *errorId);
 
 datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
diff --git a/sql/backends/monet5/datacell/basket.mal 
b/sql/backends/monet5/datacell/basket.mal
--- a/sql/backends/monet5/datacell/basket.mal
+++ b/sql/backends/monet5/datacell/basket.mal
@@ -69,10 +69,6 @@ command reset():void
 address BSKTreset
 comment "Remove all baskets";
 
-command baskets():bat[:str,:bat]
-address BSKTtable
-comment "Inspect the datacell baskets";
-
 command errors():bat[:str,:bat]
 address BSKTtableerrors
 comment "Return the table with all errors";
diff --git a/sql/backends/monet5/datacell/datacell.c 
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to