Changeset: 9ab21ca437ff for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9ab21ca437ff
Modified Files:
        
Branch: default
Log Message:

merged


diffs (truncated from 809 to 300 lines):

diff --git a/monetdb5/modules/mal/mal_init.mx b/monetdb5/modules/mal/mal_init.mx
--- a/monetdb5/modules/mal/mal_init.mx
+++ b/monetdb5/modules/mal/mal_init.mx
@@ -119,7 +119,6 @@
 include opt_aliases;
 include opt_coercion;
 include opt_constants;
-include opt_datacell;
 include opt_dataflow;
 include opt_dictionary;
 include opt_deadcode;
diff --git a/sql/backends/monet5/datacell/50_datacell.mal 
b/sql/backends/monet5/datacell/50_datacell.mal
--- a/sql/backends/monet5/datacell/50_datacell.mal
+++ b/sql/backends/monet5/datacell/50_datacell.mal
@@ -16,6 +16,7 @@
 # All Rights Reserved.
 
 # This loads the MonetDB/SQL module
+library datacell;
 include basket;
 include receptor;
 include emitter;
diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal 
b/sql/backends/monet5/datacell/Tests/emitter00.mal
--- a/sql/backends/monet5/datacell/Tests/emitter00.mal
+++ b/sql/backends/monet5/datacell/Tests/emitter00.mal
@@ -1,21 +1,25 @@
-#A test for the emitter
-#The emitter uses the prefix to lock the streams
-libary datacell;
-include basket;
-include emitter;
+#A single thread for a simple stream
+#The test is based on the definition the datacell baskets X 
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
 
-p1:= basket.new("X_p1",:bat[:lng,:int]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1","X_p2");
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
+sql.init();
 
-emitter.new("X");
-emitter.start("X","localhost",50001);
-io.print("emitter done");
+emitter.start("datacell","X","localhost",50502,"passive");
+io.print("emitter working");
 alarm.sleep(5);
-emitter.drop("X");
+emitter.pause("datacell","X");
+io.print("emitter paused");
+alarm.sleep(5);
+emitter.resume("datacell","X");
+io.print("emitter restarted");
+alarm.sleep(5);
+emitter.pause("datacell","X");
+io.print("emitter stopped");
+emitter.drop("datacell","X");
+
+# The SQL equivalents
+# call emitter.start('datacell','X','localhost',50502,'passive');
+# call emitter.pause('datacell','X');
+# call emitter.resume('datacell','X');
+# call emitter.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal 
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -1,17 +1,25 @@
 #A single thread for a simple stream
-#The receptor uses the prefix to lock the streams
-libary datacell;
-include basket;
-include receptor;
+#The test is based on the definition the datacell baskets X 
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
 
-p1:= basket.new("X_p1",:bat[:lng,:lng]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1");
-basket.group("X","X_p2");
-receptor.new("X");
-receptor.setType("X","server");
-receptor.start("X","localhost",50001);
-io.print("receptors working");
-io.print(p1,p2);
+sql.init();
+
+receptor.start("datacell","X","localhost",50501,"passive");
+io.print("receptor working");
 alarm.sleep(5);
-receptor.drop("X");
+receptor.pause("datacell","X");
+io.print("receptor paused");
+alarm.sleep(5);
+receptor.resume("datacell","X");
+io.print("receptor restarted");
+alarm.sleep(5);
+receptor.pause("datacell","X");
+io.print("receptor stopped");
+receptor.drop("datacell","X");
+
+# The SQL equivalents
+# call receptor.start('datacell','X','localhost',50501,'passive');
+# call receptor.pause('datacell','X');
+# call receptor.resume('datacell','X');
+# call receptor.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/datacell.sql 
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -14,11 +14,35 @@
 -- Copyright August 2008-2011 MonetDB B.V.
 -- All Rights Reserved.
 
--- Datacell wrappers
-create procedure receptor (url string, act string)
-    external name datacell.receptor_action;
+-- Datacell basket  wrappers
 
--- MonetDB tuple formatted message field extractors
-create procedure datacell.register_basket(s string)
-external name datacell.register_basket;
+-- Datacell receptor wrappers
 
+create schema receptor;
+create procedure receptor.start (sch string, tbl string, host string, port 
int, protocol string)
+    external name receptor.start;
+
+create procedure receptor.pause (sch string, tbl string)
+    external name receptor.pause;
+
+create procedure receptor.resume (sch string, tbl string)
+    external name receptor.resume;
+
+create procedure receptor.drop (sch string, tbl string)
+    external name receptor.drop;
+
+-- Datacell emitter wrappers
+
+create schema emitter;
+create procedure emitter.start (sch string, tbl string, host string, port int, 
protocol string)
+    external name emitter.start;
+
+create procedure emitter.pause (sch string, tbl string)
+    external name emitter.pause;
+
+create procedure emitter.resume (sch string, tbl string)
+    external name emitter.resume;
+
+create procedure emitter.drop (sch string, tbl string)
+    external name emitter.drop;
+
diff --git a/sql/backends/monet5/datacell/emitter.mx 
b/sql/backends/monet5/datacell/emitter.mx
--- a/sql/backends/monet5/datacell/emitter.mx
+++ b/sql/backends/monet5/datacell/emitter.mx
@@ -21,9 +21,6 @@
 This module is a prototype for the implementation of a
 DataCell emitter.  It can be used as follows.
 @example
-pl:= basket.new("X_p1",:bat[:lng, :int]);
-emitter.new("Y");
-emitter.start("Y","localhost",50000);
 @end example
 After this call it will sent tuples from basket X_p1
 to the stream Y at the localhost default port.
@@ -41,31 +38,24 @@
 
 @mal
 module emitter;
-command new(sch:str,tbl:str):void
+
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
 address DCemitterNew
-comment "Define a new emitter.";
+comment "Define a emitter based on a basket table. 
+The emitter protocol is either active or passive. Return its handle";
 
-command setType(sch:str,tbl:str, type:str):void
-address DCemitterSetType
-comment "Set the emitter as a server or a client.";
+command pause(schema:str, tbl:str)
+address DCemitterPause
+comment "Pause listening";
 
-command start(sch:str,tbl:str)
-address DCemitterStart
-comment "Start an emitter thread";
-command start(sch:str,tbl:str, host:str, port:int)
-address DCemitterStartFull
-comment "Start an emitter thread";
+command resume(schema:str, tbl:str)
+address DCemitterResume
+comment "Resume a emitter thread";
 
-command pause(sch:str,tbl:str)
-address DCemitterPause
-comment "Pause sending";
-command stop(sch:str,tbl:str)
-address DCemitterStop
-comment "Stop an emitter thread";
+command drop(schema:str, tb:str)
+address DCemitterDrop
+comment "Drop a emitter";
 
-command drop(sch:str,tbl:str)
-address DCemitterDrop
-comment "Drop the emitter";
 @-
 @{
 @+ Implementation
@@ -79,7 +69,7 @@
 #include "mtime.h"
 #include "basket.h"
 
-/* #define _DEBUG_EMITTER_ */
+#define _DEBUG_EMITTER_ 
 #define EMout GDKout
 
 #ifdef WIN32
@@ -92,13 +82,11 @@
 #define adapters_export extern
 #endif
 
-adapters_export str DCemitterNew(int *ret, str *schema, str *grp);
-adapters_export str DCemitterSetType(int *ret, str *schema, str *grp, str 
*type);
-adapters_export str DCemitterStart(int *ret, str *schema, str *nme);
-adapters_export str DCemitterStartFull(int *ret, str *schema, str *nme, str 
*host, int *port);
-adapters_export str DCemitterStop(int *ret, str *schema, str *nme);
+adapters_export str DCemitterNew(int *ret, str *schema, str *tbl, str *host, 
int *port, str *proto);
 adapters_export str DCemitterPause(int *ret, str *schema, str *nme);
+adapters_export str DCemitterResume(int *ret, str *schema, str *nme);
 adapters_export str DCemitterDrop(int *ret, str *schema, str *nme);
+
 #endif
 
 @c
@@ -113,6 +101,9 @@
 #define EMDROP 5               
 #define EMERROR 8               /* failed to establish the stream */
 
+#define EMPASSIVE 1
+#define EMACTIVE 2
+
 #define TCP 1
 #define UDP 2
 static int protocol= TCP;
@@ -125,7 +116,7 @@
        str host;
        int port;
        int status;
-       int server;/* control the delay between attempts to connect */
+       int protocol;/* control the delay between attempts to connect */
        int delay;
        int lck;
        SOCKET sockfd;
@@ -170,28 +161,41 @@
 The baskets should already be defined. There order
 is used to interpret the messages sent.
 @c
-str DCemitterNew(int *ret, str *schema, str *grp)
+str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str 
*proto)
 {
        Emitter em;
-       int idx, i, j, len;
+       int protocol, idx, i, j, len;
        BAT *b;
 
-       if (EMfind(*schema, *grp))
+       if (EMfind(*schema, *tbl))
                throw(MAL, "emitter.new", "Duplicate emitter");
-       em = EMnew(*schema, *grp);
-       em->host = GDKstrdup("localhost");
-       em->port = 50001;
-       em->delay = PAUSEDEFAULT;
-       em->lck = 0;
-       em->error = NULL;
-       em->server = 1;
 @-
 All tables are prepended with a default tick bat.
 It becomes the synchronization handle.
 @c
-       len = DCmemberCount(*schema, *grp);
+       len = DCmemberCount(*schema, *tbl);
        if (len == 0)
-               throw(MAL, "receptor.new", "Group has no members");
+               throw(MAL, "emitter.new", "Group has no members");
+
+       idx = DClocate(*schema, *tbl);
+       if (idx == 0)
+               throw(MAL, "emitter.new", "basket not found");
+
+       if ( strcmp(*proto, "active") == 0)
+               protocol = EMACTIVE;
+       else
+       if ( strcmp(*proto, "passive") == 0)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to