Changeset: 9ab21ca437ff for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=9ab21ca437ff
Modified Files:
Branch: default
Log Message:
merged
diffs (truncated from 809 to 300 lines):
diff --git a/monetdb5/modules/mal/mal_init.mx b/monetdb5/modules/mal/mal_init.mx
--- a/monetdb5/modules/mal/mal_init.mx
+++ b/monetdb5/modules/mal/mal_init.mx
@@ -119,7 +119,6 @@
include opt_aliases;
include opt_coercion;
include opt_constants;
-include opt_datacell;
include opt_dataflow;
include opt_dictionary;
include opt_deadcode;
diff --git a/sql/backends/monet5/datacell/50_datacell.mal
b/sql/backends/monet5/datacell/50_datacell.mal
--- a/sql/backends/monet5/datacell/50_datacell.mal
+++ b/sql/backends/monet5/datacell/50_datacell.mal
@@ -16,6 +16,7 @@
# All Rights Reserved.
# This loads the MonetDB/SQL module
+library datacell;
include basket;
include receptor;
include emitter;
diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal
b/sql/backends/monet5/datacell/Tests/emitter00.mal
--- a/sql/backends/monet5/datacell/Tests/emitter00.mal
+++ b/sql/backends/monet5/datacell/Tests/emitter00.mal
@@ -1,21 +1,25 @@
-#A test for the emitter
-#The emitter uses the prefix to lock the streams
-libary datacell;
-include basket;
-include emitter;
+#A single thread for a simple stream
+#The test is based on the definition the datacell baskets X
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
-p1:= basket.new("X_p1",:bat[:lng,:int]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1","X_p2");
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
+sql.init();
-emitter.new("X");
-emitter.start("X","localhost",50001);
-io.print("emitter done");
+emitter.start("datacell","X","localhost",50502,"passive");
+io.print("emitter working");
alarm.sleep(5);
-emitter.drop("X");
+emitter.pause("datacell","X");
+io.print("emitter paused");
+alarm.sleep(5);
+emitter.resume("datacell","X");
+io.print("emitter restarted");
+alarm.sleep(5);
+emitter.pause("datacell","X");
+io.print("emitter stopped");
+emitter.drop("datacell","X");
+
+# The SQL equivalents
+# call emitter.start('datacell','X','localhost',50502,'passive');
+# call emitter.pause('datacell','X');
+# call emitter.resume('datacell','X');
+# call emitter.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -1,17 +1,25 @@
#A single thread for a simple stream
-#The receptor uses the prefix to lock the streams
-libary datacell;
-include basket;
-include receptor;
+#The test is based on the definition the datacell baskets X
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
-p1:= basket.new("X_p1",:bat[:lng,:lng]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1");
-basket.group("X","X_p2");
-receptor.new("X");
-receptor.setType("X","server");
-receptor.start("X","localhost",50001);
-io.print("receptors working");
-io.print(p1,p2);
+sql.init();
+
+receptor.start("datacell","X","localhost",50501,"passive");
+io.print("receptor working");
alarm.sleep(5);
-receptor.drop("X");
+receptor.pause("datacell","X");
+io.print("receptor paused");
+alarm.sleep(5);
+receptor.resume("datacell","X");
+io.print("receptor restarted");
+alarm.sleep(5);
+receptor.pause("datacell","X");
+io.print("receptor stopped");
+receptor.drop("datacell","X");
+
+# The SQL equivalents
+# call receptor.start('datacell','X','localhost',50501,'passive');
+# call receptor.pause('datacell','X');
+# call receptor.resume('datacell','X');
+# call receptor.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/datacell.sql
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -14,11 +14,35 @@
-- Copyright August 2008-2011 MonetDB B.V.
-- All Rights Reserved.
--- Datacell wrappers
-create procedure receptor (url string, act string)
- external name datacell.receptor_action;
+-- Datacell basket wrappers
--- MonetDB tuple formatted message field extractors
-create procedure datacell.register_basket(s string)
-external name datacell.register_basket;
+-- Datacell receptor wrappers
+create schema receptor;
+create procedure receptor.start (sch string, tbl string, host string, port
int, protocol string)
+ external name receptor.start;
+
+create procedure receptor.pause (sch string, tbl string)
+ external name receptor.pause;
+
+create procedure receptor.resume (sch string, tbl string)
+ external name receptor.resume;
+
+create procedure receptor.drop (sch string, tbl string)
+ external name receptor.drop;
+
+-- Datacell emitter wrappers
+
+create schema emitter;
+create procedure emitter.start (sch string, tbl string, host string, port int,
protocol string)
+ external name emitter.start;
+
+create procedure emitter.pause (sch string, tbl string)
+ external name emitter.pause;
+
+create procedure emitter.resume (sch string, tbl string)
+ external name emitter.resume;
+
+create procedure emitter.drop (sch string, tbl string)
+ external name emitter.drop;
+
diff --git a/sql/backends/monet5/datacell/emitter.mx
b/sql/backends/monet5/datacell/emitter.mx
--- a/sql/backends/monet5/datacell/emitter.mx
+++ b/sql/backends/monet5/datacell/emitter.mx
@@ -21,9 +21,6 @@
This module is a prototype for the implementation of a
DataCell emitter. It can be used as follows.
@example
-pl:= basket.new("X_p1",:bat[:lng, :int]);
-emitter.new("Y");
-emitter.start("Y","localhost",50000);
@end example
After this call it will sent tuples from basket X_p1
to the stream Y at the localhost default port.
@@ -41,31 +38,24 @@
@mal
module emitter;
-command new(sch:str,tbl:str):void
+
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
address DCemitterNew
-comment "Define a new emitter.";
+comment "Define a emitter based on a basket table.
+The emitter protocol is either active or passive. Return its handle";
-command setType(sch:str,tbl:str, type:str):void
-address DCemitterSetType
-comment "Set the emitter as a server or a client.";
+command pause(schema:str, tbl:str)
+address DCemitterPause
+comment "Pause listening";
-command start(sch:str,tbl:str)
-address DCemitterStart
-comment "Start an emitter thread";
-command start(sch:str,tbl:str, host:str, port:int)
-address DCemitterStartFull
-comment "Start an emitter thread";
+command resume(schema:str, tbl:str)
+address DCemitterResume
+comment "Resume a emitter thread";
-command pause(sch:str,tbl:str)
-address DCemitterPause
-comment "Pause sending";
-command stop(sch:str,tbl:str)
-address DCemitterStop
-comment "Stop an emitter thread";
+command drop(schema:str, tb:str)
+address DCemitterDrop
+comment "Drop a emitter";
-command drop(sch:str,tbl:str)
-address DCemitterDrop
-comment "Drop the emitter";
@-
@{
@+ Implementation
@@ -79,7 +69,7 @@
#include "mtime.h"
#include "basket.h"
-/* #define _DEBUG_EMITTER_ */
+#define _DEBUG_EMITTER_
#define EMout GDKout
#ifdef WIN32
@@ -92,13 +82,11 @@
#define adapters_export extern
#endif
-adapters_export str DCemitterNew(int *ret, str *schema, str *grp);
-adapters_export str DCemitterSetType(int *ret, str *schema, str *grp, str
*type);
-adapters_export str DCemitterStart(int *ret, str *schema, str *nme);
-adapters_export str DCemitterStartFull(int *ret, str *schema, str *nme, str
*host, int *port);
-adapters_export str DCemitterStop(int *ret, str *schema, str *nme);
+adapters_export str DCemitterNew(int *ret, str *schema, str *tbl, str *host,
int *port, str *proto);
adapters_export str DCemitterPause(int *ret, str *schema, str *nme);
+adapters_export str DCemitterResume(int *ret, str *schema, str *nme);
adapters_export str DCemitterDrop(int *ret, str *schema, str *nme);
+
#endif
@c
@@ -113,6 +101,9 @@
#define EMDROP 5
#define EMERROR 8 /* failed to establish the stream */
+#define EMPASSIVE 1
+#define EMACTIVE 2
+
#define TCP 1
#define UDP 2
static int protocol= TCP;
@@ -125,7 +116,7 @@
str host;
int port;
int status;
- int server;/* control the delay between attempts to connect */
+ int protocol;/* control the delay between attempts to connect */
int delay;
int lck;
SOCKET sockfd;
@@ -170,28 +161,41 @@
The baskets should already be defined. There order
is used to interpret the messages sent.
@c
-str DCemitterNew(int *ret, str *schema, str *grp)
+str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str
*proto)
{
Emitter em;
- int idx, i, j, len;
+ int protocol, idx, i, j, len;
BAT *b;
- if (EMfind(*schema, *grp))
+ if (EMfind(*schema, *tbl))
throw(MAL, "emitter.new", "Duplicate emitter");
- em = EMnew(*schema, *grp);
- em->host = GDKstrdup("localhost");
- em->port = 50001;
- em->delay = PAUSEDEFAULT;
- em->lck = 0;
- em->error = NULL;
- em->server = 1;
@-
All tables are prepended with a default tick bat.
It becomes the synchronization handle.
@c
- len = DCmemberCount(*schema, *grp);
+ len = DCmemberCount(*schema, *tbl);
if (len == 0)
- throw(MAL, "receptor.new", "Group has no members");
+ throw(MAL, "emitter.new", "Group has no members");
+
+ idx = DClocate(*schema, *tbl);
+ if (idx == 0)
+ throw(MAL, "emitter.new", "basket not found");
+
+ if ( strcmp(*proto, "active") == 0)
+ protocol = EMACTIVE;
+ else
+ if ( strcmp(*proto, "passive") == 0)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list