Changeset: 61c2085dc8e2 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=61c2085dc8e2
Modified Files:
sql/backends/monet5/datacell/Tests/basket00.mal
sql/backends/monet5/datacell/Tests/emitter00.mal
sql/backends/monet5/datacell/basket.mx
sql/backends/monet5/datacell/datacell.mx
sql/backends/monet5/datacell/emitter.mx
sql/backends/monet5/datacell/petrinet.mx
sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:
More development support operations
Fixing the tests and adding some debugging routines.
diffs (truncated from 626 to 300 lines):
diff --git a/sql/backends/monet5/datacell/Tests/basket00.mal
b/sql/backends/monet5/datacell/Tests/basket00.mal
--- a/sql/backends/monet5/datacell/Tests/basket00.mal
+++ b/sql/backends/monet5/datacell/Tests/basket00.mal
@@ -3,13 +3,9 @@
basket.register("datacell","x");
-basket.lock("x");
+basket.lock("datacell","x");
io.print("x locked");
-basket.unlock("x");
+basket.unlock("datacell","x");
-basket.drop("x");
+basket.drop("datacell","x");
-# The SQL equivalent
-#call datacell.basket('datacell','x');
-#call datacell.lock('x');
-#call datacell.unlock('x');
diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal
b/sql/backends/monet5/datacell/Tests/emitter00.mal
--- a/sql/backends/monet5/datacell/Tests/emitter00.mal
+++ b/sql/backends/monet5/datacell/Tests/emitter00.mal
@@ -17,6 +17,7 @@
alarm.sleep(5);
emitter.pause("datacell","y");
io.print("emitter stopped");
+emitter.dump();
emitter.drop("datacell","y");
# The SQL equivalents
diff --git a/sql/backends/monet5/datacell/basket.mx
b/sql/backends/monet5/datacell/basket.mx
--- a/sql/backends/monet5/datacell/basket.mx
+++ b/sql/backends/monet5/datacell/basket.mx
@@ -40,6 +40,13 @@
address BSKTunlock
comment "Unlock the basket";
+command drop(schema:str, tbl:str):void
+address BSKTdrop
+comment "Remove the basket";
+
+command dump()
+address BSKTdump
+comment "Dump the status of the basket table";
@{
@h
#ifndef _BASKETS_
@@ -78,10 +85,11 @@
datacell_export str schema_default;
datacell_export str BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str BSKTnewbasket(sql_schema *s, sql_table *t, sql_trans *tr);
-datacell_export str BSKTderegister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+datacell_export str BSKTdrop(int *ret, str *sch, str *tbl);
datacell_export str BSKTinventory(int *ret);
datacell_export int BSKTmemberCount(str schema, str tbl);
datacell_export int BSKTlocate(str schema, str tbl);
+datacell_export str BSKTdump(int *ret);
datacell_export str BSKTlock(int *ret, str *schema, str *tbl, int *delay);
datacell_export str BSKTunlock(int *ret, str *schema, str *tbl);
@@ -111,7 +119,6 @@
This is not produced by GDKusec, which returns microseconds
since the start of the program.
Notice that this routine consumes noticable time.
-@c
lng usec(void)
{
struct timeval tp;
@@ -119,9 +126,16 @@
gettimeofday(&tp, NULL);
return ((lng)tp.tv_sec) * LL_CONSTANT(1000000) + (lng)tp.tv_usec;
}
+@c
static int BSKTnewEntry()
{
+ int i;
+ for ( i=0; i< bsktLimit; i++)
+ if (baskets[i].schema == NULL)
+ break;
+ if ( i < bsktLimit)
+ return i;
if (bsktLimit == 0) {
bsktLimit = MAXBSK;
baskets = (BSKTbasketRec *)GDKzalloc(bsktLimit *
sizeof(BSKTbasketRec));
@@ -193,10 +207,11 @@
sql_table *t;
mvc *m = NULL;
str schema, tbl, msg = getContext(cntxt, mb, &m, NULL);
- sql_trans *tr = m->session->tr;
+ sql_trans *tr;
if (msg != MAL_SUCCEED)
return msg;
+ tr = m->session->tr;
s = mvc_bind_schema(m, schema = *(str*)getArgReference(stk, pci, 1));
tbl = *(str*)getArgReference(stk, pci, 2);
if (s == NULL)
@@ -254,10 +269,49 @@
bskt = BSKTlocate(*schema, *tbl);
if (bskt == 0)
- throw(MAL, "basket.lock", "Could not find the basket group");
+ throw(MAL, "basket.lock", "Could not find the basket");
mal_unset_lock(baskets[bskt].lock, "lock basket");
(void)ret;
return MAL_SUCCEED;
}
+
+str
+BSKTdrop(int *ret, str *schema, str *tbl)
+{
+ int bskt;
+
+ bskt = BSKTlocate(*schema, *tbl);
+ if (bskt == 0)
+ throw(MAL, "basket.drop", "Could not find the basket");
+ baskets[bskt].colcount = 0;
+ GDKfree(baskets[bskt].schema);
+ GDKfree(baskets[bskt].name);
+ GDKfree(baskets[bskt].cols);
+ GDKfree(baskets[bskt].primary);
+ baskets[bskt].schema = 0;
+ baskets[bskt].name = 0;
+ baskets[bskt].cols = 0;
+ baskets[bskt].primary = 0;
+
+ (void)ret;
+ return MAL_SUCCEED;
+}
+
+str
+BSKTdump(int *ret)
+{
+ int bskt;
+
+ for ( bskt = 0; bskt < bsktLimit; bskt++)
+ if ( baskets[bskt].schema){
+ mnstr_printf(GDKout, "#baskets[%2d] %s.%s count %d\n", bskt,
+ baskets[bskt].schema,
+ baskets[bskt].name,
+ baskets[bskt].colcount);
+ }
+ (void)ret;
+ return MAL_SUCCEED;
+}
+
@}
diff --git a/sql/backends/monet5/datacell/datacell.mx
b/sql/backends/monet5/datacell/datacell.mx
--- a/sql/backends/monet5/datacell/datacell.mx
+++ b/sql/backends/monet5/datacell/datacell.mx
@@ -24,10 +24,6 @@
address DCregister
comment "Initialize a new basket based on a specific table definition in the
datacell schema");
-command inventory():bat[:str,:bat]
-address DCinventory
-comment "Produce a tabular view of the datacell streaming components";
-
command emitter(sch:str, tbl:str, host:str, port:int, protocol:str)
address DCemitter
comment "define a emitter based on a basket table. The emitter protocol is
either active or passive.";
@@ -52,13 +48,29 @@
address DCquery
comment "Add a new continuous query.";
-pattern run()
-address DCrun
+pattern start()
+address DCstartScheduler
+comment "Convert the datacell schema to a stream processing infrastructure";
+
+pattern pause()
+address DCrunScheduler
comment "(Re)start the petrinet scheduler.";
+pattern resume()
+address DCresumeScheduler
+comment "Resume the petrinet scheduler.";
+
pattern stop()
-address DCrun
+address DCstopScheduler
comment "Stop the petrinet scheduler.";
+
+command status():bat[:str,:bat]
+address DCstatus
+comment "Produce a tabular view of the datacell streaming components";
+
+command dump()
+address DCdump
+comment "Dump receptor/emitter status";
@{
@h
#ifndef _DATACELLS_
@@ -69,6 +81,9 @@
#include "mal_interpreter.h"
#include "sql.h"
#include "basket.h"
+#include "receptor.h"
+#include "emitter.h"
+#include "petrinet.h"
#ifdef WIN32
#ifndef LIBCONTAINERS
@@ -82,19 +97,23 @@
/* #define _DEBUG_DATACELL debug this module */
-datacell_export str DCregister_all(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCdrop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
datacell_export str DCpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
-datacell_export str DCrun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
-datacell_export str DCstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
-datacell_export str DCinventory(int *ret);
+datacell_export str DCdump(int *ret);
+
+datacell_export str DCstartScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+datacell_export str DCpauseScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+datacell_export str DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+datacell_export str DCstopScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+
+datacell_export str DCstatus(int *ret);
#endif
@c
-#include "basket.h"
+#include "datacell.h"
#ifdef WIN32
#include "winsock2.h"
#endif
@@ -108,52 +127,41 @@
str
DCpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
+ int idx, ret=0;
+ str schema= *(str*) getArgReference(stk, pci,1);
+ str tbl= *(str*) getArgReference(stk, pci,2);
+
+ idx = BSKTlocate(schema,tbl);
+ if ( idx == 0)
+ throw(SQL, "datacell.pause", "Basket not found");
+
+ DCreceptorPause(&ret, &schema, &tbl);
+ DCemitterPause(&ret, &schema, &tbl);
(void) cntxt;
(void) mb;
- (void) stk;
- (void) pci;
return MAL_SUCCEED;
}
str
DCresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
+ int idx, ret= 0;
+ str schema= *(str*) getArgReference(stk, pci,1);
+ str tbl= *(str*) getArgReference(stk, pci,2);
+
+ idx = BSKTlocate(schema,tbl);
+ if ( idx == 0)
+ throw(SQL, "datacell.pause", "Basket not found");
+
+ DCreceptorPause(&ret, &schema, &tbl);
+ DCemitterPause(&ret, &schema, &tbl);
(void) cntxt;
(void) mb;
- (void) stk;
- (void) pci;
return MAL_SUCCEED;
}
-/* initialize all components of the datacell schema */
str
-DCregister_all(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- sql_schema *s;
- sql_table *t;
- mvc *m = NULL;
- str msg = getContext(cntxt, mb, &m, NULL);
- sql_trans *tr = m->session->tr;
- node *o;
-
- (void)stk;
- (void)pci;
- s = mvc_bind_schema(m, schema_default);
- if (s == NULL)
- throw(SQL, "datacell.register", "Schema missing");
- for (o = s->tables.set->h; msg == MAL_SUCCEED && o; o = o->next) {
- t = o->data;
-
- /* check double registration */
- if (BSKTlocate(schema_default, t->base.name))
- throw(SQL, "datacell.register", "Basket defined
twice.");
- msg = BSKTnewbasket(s, t, tr);
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list