Changeset: 34fa674749ee for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=34fa674749ee
Modified Files:
monetdb5/modules/mal/mal_init.mx
sql/backends/monet5/datacell/Tests/receptor00.mal
sql/backends/monet5/datacell/datacell.sql
sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:
Redesign of receptor module
The design is reduced to a simple start-{pause,resume} - drop scheme.
The notion of active/passive receptors replaces the client/server tags.
No testing yet, merely reducing interface complexity.
diffs (truncated from 407 to 300 lines):
diff --git a/monetdb5/modules/mal/mal_init.mx b/monetdb5/modules/mal/mal_init.mx
--- a/monetdb5/modules/mal/mal_init.mx
+++ b/monetdb5/modules/mal/mal_init.mx
@@ -119,7 +119,6 @@
include opt_aliases;
include opt_coercion;
include opt_constants;
-include opt_datacell;
include opt_dataflow;
include opt_dictionary;
include opt_deadcode;
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -1,17 +1,25 @@
#A single thread for a simple stream
-#The receptor uses the prefix to lock the streams
-libary datacell;
-include basket;
-include receptor;
+#The test is based on the definition the datacell baskets X
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
-p1:= basket.new("X_p1",:bat[:lng,:lng]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1");
-basket.group("X","X_p2");
-receptor.new("X");
-receptor.setType("X","server");
-receptor.start("X","localhost",50001);
-io.print("receptors working");
-io.print(p1,p2);
+sql.init();
+
+receptor.start("datacell","X","localhost",5001,"passive");
+io.print("receptor working");
alarm.sleep(5);
-receptor.drop("X");
+receptor.pause("datacell","X");
+io.print("receptor paused");
+alarm.sleep(5);
+receptor.resume("datacell","X");
+io.print("receptor restarted");
+alarm.sleep(5);
+receptor.pause("datacell","X");
+io.print("receptor stopped");
+receptor.drop("datacell","X");
+
+# The SQL equivalents
+# call receptor.start('datacell','X','localhost',50501,'passive');
+# call receptor.pause('datacell','X');
+# call receptor.resume('datacell','X');
+# call receptor.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/datacell.sql
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -14,11 +14,20 @@
-- Copyright August 2008-2011 MonetDB B.V.
-- All Rights Reserved.
--- Datacell wrappers
-create procedure receptor (url string, act string)
- external name datacell.receptor_action;
+-- Datacell basket wrappers
--- MonetDB tuple formatted message field extractors
-create procedure datacell.register_basket(s string)
-external name datacell.register_basket;
+-- Datacell receptor wrappers
+create schema receptor;
+create procedure receptor.start (sch string, tbl string, host string, port
int, protocol string)
+ external name receptor.start;
+
+create procedure receptor.pause (sch string, tbl string)
+ external name receptor.pause;
+
+create procedure receptor.resume (sch string, tbl string)
+ external name receptor.resume;
+
+create procedure receptor.drop (sch string, tbl string)
+ external name receptor.drop;
+
diff --git a/sql/backends/monet5/datacell/receptor.mx
b/sql/backends/monet5/datacell/receptor.mx
--- a/sql/backends/monet5/datacell/receptor.mx
+++ b/sql/backends/monet5/datacell/receptor.mx
@@ -20,13 +20,10 @@
@+ DataCell Receptor
This module is a prototype for the implementation of a DataCell receptor. It
can be used as follows.
@example
-(id:bat[:oid,:int, ts:bat[:oid,:timestamp], pl:bat[:oid,:int]):=
basket.new("X");
-receptor.start("X");
@end example
After this call sensors can sent tuples to the
-the stream X at the DataCell default connection.
-They are appended to the event basket
-with the usec clock tick.
+the stream X at the DataCell connection.
+They are appended to the event basket with the usec clock tick included.
Each receptor is supported by an independent thread
that reads the input and stores the data in a container
@@ -43,36 +40,24 @@
@mal
module receptor;
-command new(schema:str, grp:str):void
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
address DCreceptorNew
-comment "Define a receptor based on a basketgroup.";
-
-command setType(schema:str, tbl:str, type:str):void
-address DCreceptorSetType
-comment "Set the receptor as active or passive.
-Clients take initiatives for event submission.
-Servers are awaiting delivery of events.";
-
-command start(schema:str, tbl:str)
-address DCreceptorStart
-comment "Start a receptor thread or resume listening";
-command start(schema:str, tbl:str, host:str, port:int)
-address DCreceptorStartHostPort
-comment "Start a receptor thread or resume listening
-at a specific connection.";
+comment "Define a receptor based on a basket table.
+The receptor protocol is either active or passive. Return its handle";
command pause(schema:str, tbl:str)
address DCreceptorPause
comment "Pause listening";
-command stop(schema:str, tbl:str)
-address DCreceptorStop
-comment "Stop a receptor thread";
+command resume(schema:str, tbl:str)
+address DCreceptorResume
+comment "Resume a receptor thread";
command scenario(schema:str, tbl:str, fnme:str, sequence:int)
address DCscenario
-comment "Use a file scenario to be run a number of times";
+comment "Use a file scenario to be run a number of times.
+The receptor should be paused to make this change to take effect";
-command generator(schema:str, tbl:str,modnme:str, fnme:str)
+command generator(schema:str, tbl:str, modnme:str, fnme:str)
address DCgenerator
comment "Use a function/factory to generate the events";
@@ -92,7 +77,7 @@
#include "mtime.h"
#include "basket.h"
-/*#define _DEBUG_RECEPTOR_*/
+#define _DEBUG_RECEPTOR_
#define RCout GDKout
@-
@@ -119,13 +104,10 @@
#define adapters_export extern
#endif
-adapters_export str DCreceptorNew(int *ret, str *schema, str *grp);
-adapters_export str DCreceptorStart(int *ret, str *schema, str *nme);
-adapters_export str DCreceptorStartHostPort(int *ret, str *schema, str *nme,
str *host, int *port);
-adapters_export str DCreceptorStop(int *ret, str *schema, str *nme);
+adapters_export str DCreceptorNew(int *ret, str *schema, str *tbl, str *host,
int *port, str *proto);
+adapters_export str DCreceptorPause(int *ret, str *schema, str *nme);
+adapters_export str DCreceptorResume(int *ret, str *schema, str *nme);
adapters_export str DCreceptorDrop(int *ret, str *schema, str *nme);
-adapters_export str DCreceptorSetType(int *ret, str *schema, str *nme, str
*type);
-adapters_export str DCreceptorPause(int *ret, str *schema, str *nme);
adapters_export str DCscenario(int *ret, str *schema, str *nme, str *fnme, int
*seq);
adapters_export str DCgenerator(int *ret, str *schema, str *nme, str *modnme,
str *fcnnme);
#endif
@@ -143,6 +125,8 @@
#define RCDROP 5 /* stop reading the channel */
#define RCERROR 8 /* failed to establish the stream */
+#define RCACTIVE 1 /* ask for events */
+#define RCPASSIVE 2 /* wait for events */
/* default settings */
#define RCHOST "localhost"
#define RCPORT 55000
@@ -152,7 +136,7 @@
str name;
str host;
int port;
- int server;
+ int protocol;
int status;
int delay; /* control the delay between attempts to connect */
int lck;
@@ -206,19 +190,32 @@
The standard tuple layout for MonetDB interaction is used.
@c
str
-DCreceptorNew(int *ret, str *schema, str *tbl)
+DCreceptorNew(int *ret, str *schema, str *tbl, str *host, int *port, str
*proto)
{
Receptor rc;
- int idx, i, j, len;
+ int idx, i, j, len, protocol;
Column *fmt;
BAT *b;
if (RCfind(*schema, *tbl))
throw(MAL, "receptor.new", "Duplicate receptor");
+ idx = DClocate(*schema, *tbl);
+ if (idx == 0)
+ throw(MAL, "receptor.new", "Basket not found");
+ len = DCmemberCount(*schema, *tbl);
+ if (len == 0)
+ throw(MAL, "receptor.new", "Group has no members");
+ if ( strcmp(*proto, "active") == 0)
+ protocol = RCACTIVE;
+ else
+ if (strcmp(*proto, "passive") == 0)
+ protocol = RCPASSIVE;
+ else
+ throw(MAL, "receptor.new", "Illegal protocol");
+
rc = RCnew(*schema, *tbl);
- rc->host = 0;
- rc->port = 0;
- rc->server = 0;
+ rc->host = GDKstrdup(*host);
+ rc->port = *port;
rc->error = NULL;
rc->delay = PAUSEDEFAULT;
rc->lck = 0;
@@ -227,16 +224,10 @@
rc->sequence = 0;
rc->modnme = 0;
rc->fcnnme = 0;
- len = DCmemberCount(*schema, *tbl);
- if (len == 0)
- throw(MAL, "receptor.new", "Group has no members");
+ rc->protocol = protocol;
fmt = rc->table.format = GDKzalloc(sizeof(Column) * len);
- idx = DClocate(*schema, *tbl);
- if (idx == 0)
- throw(MAL, "receptor.new", "basket not found");
-
for (j = 0, i = 0; i < baskets[idx].colcount; i++) {
b = baskets[idx].primary[j];
if (b == NULL) {
@@ -265,40 +256,10 @@
mnstr_printf(RCout, "Instantiate a new receptor %d fields\n", j);
#endif
(void)ret;
- return MAL_SUCCEED;
-}
-
-str DCreceptorSetType(int *ret, str *schema, str *nme, str *type)
-{
- Receptor rc;
- rc = RCfind(*schema, *nme);
- if (rc == NULL)
- throw(MAL, "receptor.setType", "Receptor not defined");
- if (*type && strcmp(*type, "server") == 0) {
- rc->server = 1;
- return MAL_SUCCEED;
+ rc->status = RCLISTEN;
+ if (MT_create_thread(&rc->pid, (void (*)(void *))RCstartThread, rc,
MT_THR_DETACHED) != 0) {
+ throw(MAL, "receptor.start", "Receptor initiation failed");
}
- if (*type && strcmp(*type, "client") == 0) {
- rc->server = 0;
- return MAL_SUCCEED;
- }
- (void)ret;
- throw(MAL, "receptor.setType", "'server' or 'client' expected.");
-}
-
-str DCreceptorStop(int *ret, str *schema, str *nme)
-{
- Receptor rc;
-
- rc = RCfind(*schema, *nme);
- if (rc == NULL)
- throw(MAL, "receptor.stop", "Receptor not defined");
- rc->status = RCSTOP;
-
-#ifdef _DEBUG_RECEPTOR_
- mnstr_printf(RCout, "Stop a receptor\n");
-#endif
- (void)ret;
return MAL_SUCCEED;
}
@@ -320,46 +281,22 @@
return MAL_SUCCEED;
}
-static str
-DCreceptorInit(Receptor rc, str host, int port)
-{
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list