Changeset: 712601163f25 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=712601163f25
Added Files:
sql/backends/monet5/datacell/Tests/TestingRecipe
Modified Files:
sql/backends/monet5/datacell/50_datacell.mal
sql/backends/monet5/datacell/50_datacell.sql
sql/backends/monet5/datacell/actuator.c
sql/backends/monet5/datacell/basket.c
sql/backends/monet5/datacell/basket.h
sql/backends/monet5/datacell/basket.mal
sql/backends/monet5/datacell/datacell.c
sql/backends/monet5/datacell/datacell.h
sql/backends/monet5/datacell/datacell.mal
sql/backends/monet5/datacell/emitter.c
sql/backends/monet5/datacell/emitter.h
sql/backends/monet5/datacell/opt_datacell.c
sql/backends/monet5/datacell/petrinet.c
sql/backends/monet5/datacell/petrinet.h
sql/backends/monet5/datacell/receptor.c
sql/backends/monet5/datacell/receptor.h
sql/backends/monet5/sql.mx
Branch: default
Log Message:
Update on datacell functionalities.
Code has been reshuffled and aligned in the basket.h files.
The default protocol has been set to receptor=TCP and
emitter=UDP
The table producing functions have been extended to ease
debugging of the datacell states.
More functional improvements coming.
diffs (truncated from 1217 to 300 lines):
diff --git a/sql/backends/monet5/datacell/50_datacell.mal
b/sql/backends/monet5/datacell/50_datacell.mal
--- a/sql/backends/monet5/datacell/50_datacell.mal
+++ b/sql/backends/monet5/datacell/50_datacell.mal
@@ -22,3 +22,4 @@ include basket;
include receptor;
include emitter;
+datacell.prelude();
diff --git a/sql/backends/monet5/datacell/50_datacell.sql
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -84,8 +84,8 @@ returns boolean
-- Inspection tables
create function datacell.baskets()
-returns table( nme string, threshold int, winsize int, winstride int,
timeslice int, timestride int, beat int,
- seen timestamp, grabs int, events int)
+returns table( nme string, kind string, host string, portid int, status
string, threshold int, winsize int, winstride int, timeslice int, timestride
int, beat int,
+ seen timestamp, cycles int, events int)
external name datacell.baskets;
create function datacell.queries()
diff --git a/sql/backends/monet5/datacell/Tests/TestingRecipe
b/sql/backends/monet5/datacell/Tests/TestingRecipe
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/datacell/Tests/TestingRecipe
@@ -0,0 +1,17 @@
+In most cases we have to test Datacell using manual
+interactions.
+
+The key debugging operations are datacell.baskets()
+and datacell.queries();
+Furthermore some
+The scenarios contain some hints to test the datacell.
+
+A short checklist on functionality tests:
+- test datacell.dump() on non-initialized datacell
+- test if the actuator refus to contact non-started datacell
+- test pause/resume of emitter using scenario00
+- test pause/resume of query using scenarion00
+- test pause/resume of receptor
+
+- test emitter in UDP protocol (easiest)
+- test emitter in TCP protocol, actuator start first!
diff --git a/sql/backends/monet5/datacell/actuator.c
b/sql/backends/monet5/datacell/actuator.c
--- a/sql/backends/monet5/datacell/actuator.c
+++ b/sql/backends/monet5/datacell/actuator.c
@@ -77,7 +77,7 @@ ACnew(str nme)
#define TCP 1
#define UDP 2
-static int protocol = TCP;
+static int protocol = TCP; /* default protocol to use is TCP */
#define ACTIVE 1
#define PASSIVE 2
diff --git a/sql/backends/monet5/datacell/basket.c
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -18,8 +18,7 @@
*/
/*
- * @f basket
- * @- Event baskets
+ * Event baskets
* Continuous query processing relies on event baskets
* passed through a processing pipeline. The baskets
* are derived from ordinary SQL tables where the delta
@@ -34,6 +33,9 @@
#endif
str schema_default = "datacell";
+str statusname[6] = { "<unknown>", "paused", "listen", "stopped", "dropped",
"error" };
+str modename[3] = { "<unknown>", "active", "passive" };
+str protocolname[4] = { "<unknown>", "TCP", "UDP", "CSV" };
BSKTbasketRec *baskets; /* the datacell catalog */
int bsktTop = 0, bsktLimit = 0;
@@ -115,6 +117,11 @@ BSKTlocate(str tbl)
for (i = 1; i < bsktTop; i++)
if (tbl && baskets[i].name && strcmp(tbl, baskets[i].name) == 0)
return i;
+ /* try prefixing it with datacell */
+ snprintf(buf,BUFSIZ,"datacell.%s",tbl);
+ for (i = 1; i < bsktTop; i++)
+ if (baskets[i].name && strcmp(buf, baskets[i].name) == 0)
+ return i;
return 0;
}
@@ -358,6 +365,7 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
b = BATsetaccess(b, BAT_WRITE);
BATclear(b, FALSE);
BATins(b, bn, FALSE);
+ cnt = (int) BATcount(bn);
BBPreleaseref(bn->batCacheid);
}
BBPreleaseref(bo->batCacheid);
@@ -396,13 +404,14 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
ret = (int *) getArgReference(stk, pci, i);
b = baskets[bskt].primary[i];
bn = BATcopy(b, b->htype, b->ttype, TRUE);
+ cnt = (int) BATcount(bn);
BATclear(b, FALSE);
*ret = bn->batCacheid;
BBPkeepref(*ret);
}
MT_lock_unset(&baskets[bskt].lock, "unlock basket");
}
- baskets[bskt].grabs++;
+ baskets[bskt].cycles++;
baskets[bskt].events += cnt;
return MAL_SUCCEED;
}
@@ -568,11 +577,12 @@ BSKTbeat(int *ret, str *tbl, int *sz)
/* provide a tabular view for inspection */
str
-BSKTtable(int *nameId, int *thresholdId, int * winsizeId, int *winstrideId,
int *timesliceId, int *timestrideId, int *beatId, int *seenId, int *grabsId,
int *eventsId)
+BSKTtable(int *nameId, int *kindId, int *hostId, int *portId, int *statusId,
int *thresholdId, int * winsizeId, int *winstrideId, int *timesliceId, int
*timestrideId, int *beatId, int *seenId, int *cyclesId, int *eventsId)
{
- BAT *name = NULL, *seen = NULL, *events = NULL, *grabs = NULL;
+ BAT *name = NULL, *seen = NULL, *events = NULL, *cycles = NULL;
BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
BAT *timeslice = NULL, *timestride = NULL;
+ BAT *kind, *status, *port, *host;
int i;
name = BATnew(TYPE_oid, TYPE_str, BATTINY);
@@ -599,10 +609,10 @@ BSKTtable(int *nameId, int *thresholdId,
if (seen == 0)
goto wrapup;
BATseqbase(seen, 0);
- grabs = BATnew(TYPE_oid, TYPE_int, BATTINY);
- if (grabs == 0)
+ cycles = BATnew(TYPE_oid, TYPE_int, BATTINY);
+ if (cycles == 0)
goto wrapup;
- BATseqbase(grabs, 0);
+ BATseqbase(cycles, 0);
events = BATnew(TYPE_oid, TYPE_int, BATTINY);
if (events == 0)
goto wrapup;
@@ -616,22 +626,47 @@ BSKTtable(int *nameId, int *thresholdId,
if (timestride == 0)
goto wrapup;
BATseqbase(timestride, 0);
+ kind = BATnew(TYPE_oid, TYPE_str, BATTINY);
+ if (kind == 0)
+ goto wrapup;
+ BATseqbase(kind, 0);
+ status = BATnew(TYPE_oid, TYPE_str, BATTINY);
+ if (status == 0)
+ goto wrapup;
+ BATseqbase(status, 0);
+ host = BATnew(TYPE_oid, TYPE_str, BATTINY);
+ if (host == 0)
+ goto wrapup;
+ BATseqbase(host, 0);
+ port = BATnew(TYPE_oid, TYPE_int, BATTINY);
+ if (port == 0)
+ goto wrapup;
+ BATseqbase(port, 0);
+
for (i = 1; i < bsktTop; i++)
if (baskets[i].name) {
BUNappend(name, baskets[i].name, FALSE);
+ BUNappend(kind, baskets[i].kind, FALSE);
+ BUNappend(host, baskets[i].host, FALSE);
+ BUNappend(port, &baskets[i].port, FALSE);
+ BUNappend(status, statusname[baskets[i].status], FALSE);
BUNappend(threshold, &baskets[i].threshold, FALSE);
BUNappend(winsize, &baskets[i].winsize, FALSE);
BUNappend(winstride, &baskets[i].winstride, FALSE);
BUNappend(beat, &baskets[i].beat, FALSE);
BUNappend(seen, &baskets[i].seen, FALSE);
- BUNappend(grabs, &baskets[i].grabs, FALSE);
+ BUNappend(cycles, &baskets[i].cycles, FALSE);
BUNappend(events, &baskets[i].events, FALSE);
BUNappend(timeslice, &baskets[i].timeslice, FALSE);
BUNappend(timestride, &baskets[i].timestride, FALSE);
}
BBPkeepref(*nameId = name->batCacheid);
+ BBPkeepref(*kindId = kind->batCacheid);
+ BBPkeepref(*hostId = host->batCacheid);
+ BBPkeepref(*portId = port->batCacheid);
+ BBPkeepref(*statusId = status->batCacheid);
BBPkeepref(*thresholdId = threshold->batCacheid);
BBPkeepref(*winsizeId = winsize->batCacheid);
BBPkeepref(*winstrideId = winstride->batCacheid);
@@ -639,7 +674,7 @@ BSKTtable(int *nameId, int *thresholdId,
BBPkeepref(*timestrideId = timestride->batCacheid);
BBPkeepref(*beatId = beat->batCacheid);
BBPkeepref(*seenId = seen->batCacheid);
- BBPkeepref(*grabsId = grabs->batCacheid);
+ BBPkeepref(*cyclesId = cycles->batCacheid);
BBPkeepref(*eventsId = events->batCacheid);
return MAL_SUCCEED;
wrapup:
@@ -659,10 +694,18 @@ wrapup:
BBPreleaseref(beat->batCacheid);
if (seen)
BBPreleaseref(seen->batCacheid);
- if (grabs)
- BBPreleaseref(grabs->batCacheid);
+ if (cycles)
+ BBPreleaseref(cycles->batCacheid);
if (events)
BBPreleaseref(events->batCacheid);
+ if (kind)
+ BBPreleaseref(kind->batCacheid);
+ if (status)
+ BBPreleaseref(status->batCacheid);
+ if (host)
+ BBPreleaseref(host->batCacheid);
+ if (port)
+ BBPreleaseref(port->batCacheid);
throw(MAL, "datacell.baskets", MAL_MALLOC_FAIL);
}
diff --git a/sql/backends/monet5/datacell/basket.h
b/sql/backends/monet5/datacell/basket.h
--- a/sql/backends/monet5/datacell/basket.h
+++ b/sql/backends/monet5/datacell/basket.h
@@ -42,23 +42,47 @@
typedef struct{
MT_Lock lock;
str name; /* table that represents the basket */
+ str kind; /* receptor/emitter basket */
+ int status;
int threshold ; /* bound to determine scheduling eligibility */
int winsize, winstride; /* sliding window operations */
lng timeslice, timestride; /* temporal sliding window, determined by
first temporal component */
lng beat; /* milliseconds delay */
int colcount;
+ str host;
int port; /* port claimed */
str *cols;
BAT **primary;
/* statistics */
timestamp seen;
int events; /* total number of events grabbed */
- int grabs; /* number of grabs */
+ int cycles;
/* collected errors */
BAT *errors;
} *BSKTbasket, BSKTbasketRec;
+
+#define BSKTPAUSE 1 /* not active now */
+#define BSKTLISTEN 2 /* awaiting */
+#define BSKTSTOP 3 /* stop reading the channel */
+#define BSKTDROP 4 /* stop reading the channel */
+#define BSKTERROR 5 /* failed to establish the stream */
+
+datacell_export str statusname[6];
+
+#define BSKTACTIVE 1 /* ask for events */
+#define BSKTPASSIVE 2 /* wait for events */
+
+datacell_export str modename[3];
+
+#define TCP 1
+#define UDP 2
+#define CSV 3
+
+datacell_export str protocolname[4];
+
datacell_export str schema_default;
+
datacell_export str BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str BSKTdrop(int *ret, str *tbl);
datacell_export str BSKTreset(int *ret);
@@ -72,7 +96,7 @@ datacell_export str BSKTthreshold(int *r
datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide);
-datacell_export str BSKTtable(int *nameId, int *thresholdId, int * winsizeId,
int *winstrideId,int *timesliceId, int *timestrideId, int *beatId, int *seenId,
int *grabsId, int *eventsId);
+datacell_export str BSKTtable(int *nameId, int *kindId, int *hostId, int
*portId, int *statusId, int *thresholdId, int * winsizeId, int *winstrideId,int
*timesliceId, int *timestrideId, int *beatId, int *seenId, int *cyclesId, int
*eventsId);
datacell_export str BSKTtableerrors(int *nmeId, int *errorId);
datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
diff --git a/sql/backends/monet5/datacell/basket.mal
b/sql/backends/monet5/datacell/basket.mal
--- a/sql/backends/monet5/datacell/basket.mal
+++ b/sql/backends/monet5/datacell/basket.mal
@@ -69,10 +69,6 @@ command reset():void
address BSKTreset
comment "Remove all baskets";
-command baskets():bat[:str,:bat]
-address BSKTtable
-comment "Inspect the datacell baskets";
-
command errors():bat[:str,:bat]
address BSKTtableerrors
comment "Return the table with all errors";
diff --git a/sql/backends/monet5/datacell/datacell.c
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list