Changeset: 5d6709b593e5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5d6709b593e5
Modified Files:
sql/backends/monet5/datacell/50_datacell.sql
sql/backends/monet5/datacell/Tests/scenario00.sql
sql/backends/monet5/datacell/basket.c
sql/backends/monet5/datacell/basket.h
sql/backends/monet5/datacell/datacell.c
sql/backends/monet5/datacell/datacell.h
sql/backends/monet5/datacell/datacell.mal
sql/backends/monet5/datacell/emitter.c
sql/backends/monet5/datacell/emitter.h
sql/backends/monet5/datacell/emitter.mal
sql/backends/monet5/datacell/petrinet.c
sql/backends/monet5/datacell/receptor.c
sql/backends/monet5/datacell/receptor.h
sql/backends/monet5/datacell/receptor.mal
Branch: default
Log Message:
Simplifying code base
The interface has been simplified. The receptors, emitters, basket
and queries status can be inspected through a materialized view.
diffs (truncated from 1979 to 300 lines):
diff --git a/sql/backends/monet5/datacell/50_datacell.sql
b/sql/backends/monet5/datacell/50_datacell.sql
--- a/sql/backends/monet5/datacell/50_datacell.sql
+++ b/sql/backends/monet5/datacell/50_datacell.sql
@@ -26,27 +26,21 @@ create procedure datacell.receptor(tbl s
create procedure datacell.emitter(tbl string, host string, portid integer)
external name datacell.emitter;
-create procedure datacell.mode(tbl string, mode string)
- external name datacell.mode;
-
-create procedure datacell.protocol(tbl string, protocol string)
- external name datacell.protocol;
-
create procedure datacell.pause (tbl string)
external name datacell.pause;
create procedure datacell.resume (tbl string)
external name datacell.resume;
+create procedure datacell.stop (tbl string)
+ external name datacell.stop;
+
create procedure datacell.query(proc string, def string)
external name datacell.query;
create procedure datacell.query(proc string)
external name datacell.query;
-create procedure datacell.remove (obj string)
- external name datacell.remove;
-
-- scheduler activation
create procedure datacell.prelude()
external name datacell.prelude;
@@ -84,12 +78,22 @@ returns boolean
-- Inspection tables
create function datacell.baskets()
-returns table( nme string, kind string, host string, portid int, status
string, threshold int, winsize int, winstride int, timeslice int, timestride
int, beat int,
- seen timestamp, cycles int, events int)
+returns table( nme string, threshold int, winsize int, winstride int,
timeslice int, timestride int, beat int,
+ seen timestamp, events int)
external name datacell.baskets;
+create function datacell.receptors()
+returns table( nme string, host string, port int, protocol string, mode
string, status string,
+ lastseen timestamp, cycles int, received int, pending int)
+external name datacell.receptors;
+
+create function datacell.emitters()
+returns table( nme string, host string, port int, protocol string, mode
string, status string,
+ lastsent timestamp, cycles int, sent int, pending int)
+external name datacell.emitters;
+
create function datacell.queries()
-returns table( nme string, status string, seen timestamp, cycles int, events
int, time bigint, error string, def string)
+returns table( nme string, status string, lastrun timestamp, cycles int,
events int, time bigint, error string, def string)
external name datacell.queries;
create function datacell.errors()
diff --git a/sql/backends/monet5/datacell/Tests/scenario00.sql
b/sql/backends/monet5/datacell/Tests/scenario00.sql
--- a/sql/backends/monet5/datacell/Tests/scenario00.sql
+++ b/sql/backends/monet5/datacell/Tests/scenario00.sql
@@ -20,9 +20,10 @@ call datacell.query('datacell.pass', 'in
call datacell.resume();
call datacell.dump();
+select * from datacell.receptors(); select * from datacell.emitters(); select
* from datacell.queries(); select * from datacell.baskets();
-- externally, activate the sensor
---sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1
--trace
+-- sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1
--trace
-- externally, activate the actuator server to listen
-- nc -l -u 50600
diff --git a/sql/backends/monet5/datacell/basket.c
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -33,7 +33,7 @@
#endif
str schema_default = "datacell";
-str statusname[6] = { "<unknown>", "paused", "listen", "stopped", "dropped",
"error" };
+str statusname[6] = { "<unknown>", "init", "paused", "running", "stop",
"error" };
str modename[3] = { "<unknown>", "active", "passive" };
str protocolname[4] = { "<unknown>", "TCP", "UDP", "CSV" };
@@ -141,6 +141,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
snprintf(buf, BUFSIZ, "%s.%s", s->base.name, t->base.name);
baskets[idx].name = GDKstrdup(buf);
+ baskets[idx].seen = * timestamp_nil;
baskets[idx].colcount = 0;
for (o = t->columns.set->h; o; o = o->next)
@@ -148,7 +149,6 @@ BSKTnewbasket(sql_schema *s, sql_table *
baskets[idx].cols = GDKzalloc((baskets[idx].colcount + 1) *
sizeof(str));
baskets[idx].primary = GDKzalloc((baskets[idx].colcount + 1) *
sizeof(BAT *));
baskets[idx].errors = BATnew(TYPE_void, TYPE_str, BATTINY);
- (void) MTIMEcurrent_timestamp(&baskets[idx].seen);
i = 0;
for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) {
@@ -215,6 +215,7 @@ str BSKTlock(int *ret, str *tbl, int *de
{
int bskt;
+ *ret = 0;
bskt = BSKTlocate(*tbl);
if (bskt == 0)
throw(MAL, "basket.lock", "Could not find the basket");
@@ -227,6 +228,7 @@ str BSKTlock(int *ret, str *tbl, int *de
#endif
(void) delay; /* control spinlock */
(void) ret;
+ *ret = 1;
return MAL_SUCCEED;
}
@@ -244,8 +246,8 @@ str BSKTunlock(int *ret, str *tbl)
bskt = BSKTlocate(*tbl);
if (bskt == 0)
throw(MAL, "basket.lock", "Could not find the basket");
+ *ret = 0;
MT_lock_unset(&baskets[bskt].lock, "lock basket");
- (void) ret;
return MAL_SUCCEED;
}
@@ -577,12 +579,11 @@ BSKTbeat(int *ret, str *tbl, int *sz)
/* provide a tabular view for inspection */
str
-BSKTtable(int *nameId, int *kindId, int *hostId, int *portId, int *statusId,
int *thresholdId, int * winsizeId, int *winstrideId, int *timesliceId, int
*timestrideId, int *beatId, int *seenId, int *cyclesId, int *eventsId)
+BSKTtable(int *nameId, int *thresholdId, int * winsizeId, int *winstrideId,
int *timesliceId, int *timestrideId, int *beatId, int *seenId, int *eventsId)
{
- BAT *name = NULL, *seen = NULL, *events = NULL, *cycles = NULL;
+ BAT *name = NULL, *seen = NULL, *events = NULL;
BAT *threshold = NULL, *winsize = NULL, *winstride = NULL, *beat = NULL;
BAT *timeslice = NULL, *timestride = NULL;
- BAT *kind, *status, *port, *host;
int i;
name = BATnew(TYPE_oid, TYPE_str, BATTINY);
@@ -609,10 +610,6 @@ BSKTtable(int *nameId, int *kindId, int
if (seen == 0)
goto wrapup;
BATseqbase(seen, 0);
- cycles = BATnew(TYPE_oid, TYPE_int, BATTINY);
- if (cycles == 0)
- goto wrapup;
- BATseqbase(cycles, 0);
events = BATnew(TYPE_oid, TYPE_int, BATTINY);
if (events == 0)
goto wrapup;
@@ -626,47 +623,21 @@ BSKTtable(int *nameId, int *kindId, int
if (timestride == 0)
goto wrapup;
BATseqbase(timestride, 0);
- kind = BATnew(TYPE_oid, TYPE_str, BATTINY);
- if (kind == 0)
- goto wrapup;
- BATseqbase(kind, 0);
- status = BATnew(TYPE_oid, TYPE_str, BATTINY);
- if (status == 0)
- goto wrapup;
- BATseqbase(status, 0);
- host = BATnew(TYPE_oid, TYPE_str, BATTINY);
- if (host == 0)
- goto wrapup;
- BATseqbase(host, 0);
- port = BATnew(TYPE_oid, TYPE_int, BATTINY);
- if (port == 0)
- goto wrapup;
- BATseqbase(port, 0);
-
for (i = 1; i < bsktTop; i++)
if (baskets[i].name) {
BUNappend(name, baskets[i].name, FALSE);
- BUNappend(kind, baskets[i].kind, FALSE);
- BUNappend(host, baskets[i].host, FALSE);
- BUNappend(port, &baskets[i].port, FALSE);
- BUNappend(status, statusname[baskets[i].status], FALSE);
BUNappend(threshold, &baskets[i].threshold, FALSE);
BUNappend(winsize, &baskets[i].winsize, FALSE);
BUNappend(winstride, &baskets[i].winstride, FALSE);
BUNappend(beat, &baskets[i].beat, FALSE);
BUNappend(seen, &baskets[i].seen, FALSE);
- BUNappend(cycles, &baskets[i].cycles, FALSE);
BUNappend(events, &baskets[i].events, FALSE);
BUNappend(timeslice, &baskets[i].timeslice, FALSE);
BUNappend(timestride, &baskets[i].timestride, FALSE);
}
BBPkeepref(*nameId = name->batCacheid);
- BBPkeepref(*kindId = kind->batCacheid);
- BBPkeepref(*hostId = host->batCacheid);
- BBPkeepref(*portId = port->batCacheid);
- BBPkeepref(*statusId = status->batCacheid);
BBPkeepref(*thresholdId = threshold->batCacheid);
BBPkeepref(*winsizeId = winsize->batCacheid);
BBPkeepref(*winstrideId = winstride->batCacheid);
@@ -674,7 +645,6 @@ BSKTtable(int *nameId, int *kindId, int
BBPkeepref(*timestrideId = timestride->batCacheid);
BBPkeepref(*beatId = beat->batCacheid);
BBPkeepref(*seenId = seen->batCacheid);
- BBPkeepref(*cyclesId = cycles->batCacheid);
BBPkeepref(*eventsId = events->batCacheid);
return MAL_SUCCEED;
wrapup:
@@ -694,18 +664,8 @@ wrapup:
BBPreleaseref(beat->batCacheid);
if (seen)
BBPreleaseref(seen->batCacheid);
- if (cycles)
- BBPreleaseref(cycles->batCacheid);
if (events)
BBPreleaseref(events->batCacheid);
- if (kind)
- BBPreleaseref(kind->batCacheid);
- if (status)
- BBPreleaseref(status->batCacheid);
- if (host)
- BBPreleaseref(host->batCacheid);
- if (port)
- BBPreleaseref(port->batCacheid);
throw(MAL, "datacell.baskets", MAL_MALLOC_FAIL);
}
diff --git a/sql/backends/monet5/datacell/basket.h
b/sql/backends/monet5/datacell/basket.h
--- a/sql/backends/monet5/datacell/basket.h
+++ b/sql/backends/monet5/datacell/basket.h
@@ -42,18 +42,15 @@
typedef struct{
MT_Lock lock;
str name; /* table that represents the basket */
- str kind; /* receptor/emitter basket */
- int status;
int threshold ; /* bound to determine scheduling eligibility */
int winsize, winstride; /* sliding window operations */
lng timeslice, timestride; /* temporal sliding window, determined by
first temporal component */
lng beat; /* milliseconds delay */
int colcount;
- str host;
- int port; /* port claimed */
str *cols;
BAT **primary;
/* statistics */
+ int status;
timestamp seen;
int events; /* total number of events grabbed */
int cycles;
@@ -62,12 +59,14 @@ typedef struct{
} *BSKTbasket, BSKTbasketRec;
-#define BSKTPAUSE 1 /* not active now */
-#define BSKTLISTEN 2 /* awaiting */
-#define BSKTSTOP 3 /* stop reading the channel */
-#define BSKTDROP 4 /* stop reading the channel */
+#define BSKTINIT 1
+#define BSKTPAUSE 2 /* not active now */
+#define BSKTRUNNING 3
+#define BSKTSTOP 4 /* stop the thread */
#define BSKTERROR 5 /* failed to establish the stream */
+#define PAUSEDEFAULT 1000
+
datacell_export str statusname[6];
#define BSKTACTIVE 1 /* ask for events */
@@ -96,7 +95,7 @@ datacell_export str BSKTthreshold(int *r
datacell_export str BSKTbeat(int *ret, str *tbl, int *sz);
datacell_export str BSKTwindow(int *ret, str *tbl, int *sz, int *slide);
datacell_export str BSKTtimewindow(int *ret, str *tbl, int *sz, int *slide);
-datacell_export str BSKTtable(int *nameId, int *kindId, int *hostId, int
*portId, int *statusId, int *thresholdId, int * winsizeId, int *winstrideId,int
*timesliceId, int *timestrideId, int *beatId, int *seenId, int *cyclesId, int
*eventsId);
+datacell_export str BSKTtable(int *nameId, int *thresholdId, int * winsizeId,
int *winstrideId,int *timesliceId, int *timestrideId, int *beatId, int *seenId,
int *eventsId);
datacell_export str BSKTtableerrors(int *nmeId, int *errorId);
datacell_export str BSKTlock(int *ret, str *tbl, int *delay);
diff --git a/sql/backends/monet5/datacell/datacell.c
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -136,7 +136,7 @@ DCreceptor(Client cntxt, MalBlkPtr mb, M
int idx = BSKTlocate(*tbl);
if (idx == 0)
BSKTregister(cntxt, mb, stk, pci);
- return DCreceptorNew(ret, tbl, host, port);
+ return RCreceptorStart(ret, tbl, host, port);
}
str
@@ -149,7 +149,7 @@ DCemitter(Client cntxt, MalBlkPtr mb, Ma
int idx = BSKTlocate(*tbl);
if (idx == 0)
BSKTregister(cntxt, mb, stk, pci);
- return DCemitterNew(ret, tbl, host, port);
+ return EMemitterStart(ret, tbl, host, port);
}
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list