Changeset: 34fa674749ee for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=34fa674749ee
Modified Files:
        monetdb5/modules/mal/mal_init.mx
        sql/backends/monet5/datacell/Tests/receptor00.mal
        sql/backends/monet5/datacell/datacell.sql
        sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:

Redesign of receptor module
The design is reduced to a simple start-{pause,resume} - drop scheme.
The notion of active/passive receptors replaces the client/server tags.
No testing yet, merely reducing interface complexity.


diffs (truncated from 407 to 300 lines):

diff --git a/monetdb5/modules/mal/mal_init.mx b/monetdb5/modules/mal/mal_init.mx
--- a/monetdb5/modules/mal/mal_init.mx
+++ b/monetdb5/modules/mal/mal_init.mx
@@ -119,7 +119,6 @@
 include opt_aliases;
 include opt_coercion;
 include opt_constants;
-include opt_datacell;
 include opt_dataflow;
 include opt_dictionary;
 include opt_deadcode;
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal 
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -1,17 +1,25 @@
 #A single thread for a simple stream
-#The receptor uses the prefix to lock the streams
-libary datacell;
-include basket;
-include receptor;
+#The test is based on the definition the datacell baskets X 
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
 
-p1:= basket.new("X_p1",:bat[:lng,:lng]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1");
-basket.group("X","X_p2");
-receptor.new("X");
-receptor.setType("X","server");
-receptor.start("X","localhost",50001);
-io.print("receptors working");
-io.print(p1,p2);
+sql.init();
+
+receptor.start("datacell","X","localhost",5001,"passive");
+io.print("receptor working");
 alarm.sleep(5);
-receptor.drop("X");
+receptor.pause("datacell","X");
+io.print("receptor paused");
+alarm.sleep(5);
+receptor.resume("datacell","X");
+io.print("receptor restarted");
+alarm.sleep(5);
+receptor.pause("datacell","X");
+io.print("receptor stopped");
+receptor.drop("datacell","X");
+
+# The SQL equivalents
+# call receptor.start('datacell','X','localhost',50501,'passive');
+# call receptor.pause('datacell','X');
+# call receptor.resume('datacell','X');
+# call receptor.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/datacell.sql 
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -14,11 +14,20 @@
 -- Copyright August 2008-2011 MonetDB B.V.
 -- All Rights Reserved.
 
--- Datacell wrappers
-create procedure receptor (url string, act string)
-    external name datacell.receptor_action;
+-- Datacell basket  wrappers
 
--- MonetDB tuple formatted message field extractors
-create procedure datacell.register_basket(s string)
-external name datacell.register_basket;
+-- Datacell receptor wrappers
 
+create schema receptor;
+create procedure receptor.start (sch string, tbl string, host string, port 
int, protocol string)
+    external name receptor.start;
+
+create procedure receptor.pause (sch string, tbl string)
+    external name receptor.pause;
+
+create procedure receptor.resume (sch string, tbl string)
+    external name receptor.resume;
+
+create procedure receptor.drop (sch string, tbl string)
+    external name receptor.drop;
+
diff --git a/sql/backends/monet5/datacell/receptor.mx 
b/sql/backends/monet5/datacell/receptor.mx
--- a/sql/backends/monet5/datacell/receptor.mx
+++ b/sql/backends/monet5/datacell/receptor.mx
@@ -20,13 +20,10 @@
 @+ DataCell Receptor 
 This module is a prototype for the implementation of a DataCell receptor.  It 
can be used as follows.
 @example
-(id:bat[:oid,:int, ts:bat[:oid,:timestamp], pl:bat[:oid,:int]):= 
basket.new("X");
-receptor.start("X");
 @end example
 After this call sensors can sent tuples to the
-the stream X at the DataCell default connection.
-They are appended to the event basket 
-with the usec clock tick.
+the stream X at the DataCell connection.
+They are appended to the event basket with the usec clock tick included.
 
 Each receptor is supported by an independent thread
 that reads the input and stores the data in a container
@@ -43,36 +40,24 @@
 @mal
 module receptor;
 
-command new(schema:str, grp:str):void
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
 address DCreceptorNew
-comment "Define a receptor based on a basketgroup.";
-
-command setType(schema:str, tbl:str, type:str):void
-address DCreceptorSetType
-comment "Set the receptor as active or passive.
-Clients take initiatives for event submission. 
-Servers are awaiting delivery of events.";
-
-command start(schema:str, tbl:str)
-address DCreceptorStart
-comment "Start a receptor thread or resume listening";
-command start(schema:str, tbl:str, host:str, port:int)
-address DCreceptorStartHostPort
-comment "Start a receptor thread or resume listening
-at a specific connection.";
+comment "Define a receptor based on a basket table. 
+The receptor protocol is either active or passive. Return its handle";
 
 command pause(schema:str, tbl:str)
 address DCreceptorPause
 comment "Pause listening";
-command stop(schema:str, tbl:str)
-address DCreceptorStop
-comment "Stop a receptor thread";
+command resume(schema:str, tbl:str)
+address DCreceptorResume
+comment "Resume a receptor thread";
 
 command scenario(schema:str, tbl:str, fnme:str, sequence:int)
 address DCscenario
-comment "Use a file scenario to be run a number of times";
+comment "Use a file scenario to be run a number of times.
+The receptor should be paused to make this change to take effect";
 
-command generator(schema:str, tbl:str,modnme:str, fnme:str)
+command generator(schema:str, tbl:str, modnme:str, fnme:str)
 address DCgenerator
 comment "Use a function/factory to generate the events";
 
@@ -92,7 +77,7 @@
 #include "mtime.h"
 #include "basket.h"
 
-/*#define _DEBUG_RECEPTOR_*/
+#define _DEBUG_RECEPTOR_
 #define RCout GDKout  
 
 @-
@@ -119,13 +104,10 @@
 #define adapters_export extern
 #endif
 
-adapters_export str DCreceptorNew(int *ret, str *schema, str *grp);
-adapters_export str DCreceptorStart(int *ret, str *schema, str *nme);
-adapters_export str DCreceptorStartHostPort(int *ret, str *schema, str *nme, 
str *host, int *port);
-adapters_export str DCreceptorStop(int *ret, str *schema, str *nme);
+adapters_export str DCreceptorNew(int *ret, str *schema, str *tbl, str *host, 
int *port, str *proto);
+adapters_export str DCreceptorPause(int *ret, str *schema, str *nme);
+adapters_export str DCreceptorResume(int *ret, str *schema, str *nme);
 adapters_export str DCreceptorDrop(int *ret, str *schema, str *nme);
-adapters_export str DCreceptorSetType(int *ret, str *schema, str *nme, str 
*type);
-adapters_export str DCreceptorPause(int *ret, str *schema, str *nme);
 adapters_export str DCscenario(int *ret, str *schema, str *nme, str *fnme, int 
*seq);
 adapters_export str DCgenerator(int *ret, str *schema, str *nme, str *modnme, 
str *fcnnme);
 #endif
@@ -143,6 +125,8 @@
 #define RCDROP 5        /* stop reading the channel */
 #define RCERROR 8       /* failed to establish the stream */
 
+#define RCACTIVE 1             /* ask for events */
+#define RCPASSIVE 2            /* wait for events */
 /* default settings */
 #define RCHOST "localhost"
 #define RCPORT 55000
@@ -152,7 +136,7 @@
        str name;
        str host;
        int port;
-       int server;
+       int protocol;
        int status;
        int delay;  /* control the delay between attempts to connect */
        int lck;
@@ -206,19 +190,32 @@
 The standard tuple layout for MonetDB interaction is used.
 @c
 str
-DCreceptorNew(int *ret, str *schema, str *tbl)
+DCreceptorNew(int *ret, str *schema, str *tbl, str *host, int *port, str 
*proto)
 {
        Receptor rc;
-       int idx, i, j, len;
+       int idx, i, j, len, protocol;
        Column *fmt;
        BAT *b;
 
        if (RCfind(*schema, *tbl))
                throw(MAL, "receptor.new", "Duplicate receptor");
+       idx = DClocate(*schema, *tbl);
+       if (idx == 0)
+               throw(MAL, "receptor.new", "Basket not found");
+       len = DCmemberCount(*schema, *tbl);
+       if (len == 0)
+               throw(MAL, "receptor.new", "Group has no members");
+       if ( strcmp(*proto, "active") == 0) 
+               protocol = RCACTIVE;
+       else
+       if (strcmp(*proto, "passive") == 0) 
+               protocol = RCPASSIVE;
+       else
+               throw(MAL, "receptor.new", "Illegal protocol");
+
        rc = RCnew(*schema, *tbl);
-       rc->host = 0;
-       rc->port = 0;
-       rc->server = 0;
+       rc->host = GDKstrdup(*host);
+       rc->port = *port;
        rc->error = NULL;
        rc->delay = PAUSEDEFAULT;
        rc->lck = 0;
@@ -227,16 +224,10 @@
        rc->sequence = 0;
        rc->modnme = 0;
        rc->fcnnme = 0;
-       len = DCmemberCount(*schema, *tbl);
-       if (len == 0)
-               throw(MAL, "receptor.new", "Group has no members");
+       rc->protocol = protocol;
 
        fmt = rc->table.format = GDKzalloc(sizeof(Column) * len);
 
-       idx = DClocate(*schema, *tbl);
-       if (idx == 0)
-               throw(MAL, "receptor.new", "basket not found");
-
        for (j = 0, i = 0; i < baskets[idx].colcount; i++) {
                b = baskets[idx].primary[j];
                if (b == NULL) {
@@ -265,40 +256,10 @@
        mnstr_printf(RCout, "Instantiate a new receptor %d fields\n", j);
 #endif
        (void)ret;
-       return MAL_SUCCEED;
-}
-
-str DCreceptorSetType(int *ret, str *schema, str *nme, str *type)
-{
-       Receptor rc;
-       rc = RCfind(*schema, *nme);
-       if (rc == NULL)
-               throw(MAL, "receptor.setType", "Receptor not defined");
-       if (*type && strcmp(*type, "server") == 0) {
-               rc->server = 1;
-               return MAL_SUCCEED;
+       rc->status = RCLISTEN;
+       if (MT_create_thread(&rc->pid, (void (*)(void *))RCstartThread, rc, 
MT_THR_DETACHED) != 0) {
+               throw(MAL, "receptor.start", "Receptor initiation failed");
        }
-       if (*type && strcmp(*type, "client") == 0) {
-               rc->server = 0;
-               return MAL_SUCCEED;
-       }
-       (void)ret;
-       throw(MAL, "receptor.setType", "'server' or 'client' expected.");
-}
-
-str DCreceptorStop(int *ret, str *schema, str *nme)
-{
-       Receptor rc;
-
-       rc = RCfind(*schema, *nme);
-       if (rc == NULL)
-               throw(MAL, "receptor.stop", "Receptor not defined");
-       rc->status = RCSTOP;
-
-#ifdef _DEBUG_RECEPTOR_
-       mnstr_printf(RCout, "Stop a receptor\n");
-#endif
-       (void)ret;
        return MAL_SUCCEED;
 }
 
@@ -320,46 +281,22 @@
        return MAL_SUCCEED;
 }
 
-static str
-DCreceptorInit(Receptor rc, str host, int port)
-{
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to