Changeset: d36e6bd177bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d36e6bd177bc
Modified Files:
sql/backends/monet5/datacell/Tests/emitter00.mal
sql/backends/monet5/datacell/Tests/receptor00.mal
sql/backends/monet5/datacell/datacell.sql
sql/backends/monet5/datacell/emitter.mx
sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:
Redesign of emitter module
The design is reduced to a simple start-{pause,resume} - drop scheme.
The notion of active/passive emitter replaces the client/server tags.
No testing yet, merely reducing interface complexity.
diffs (truncated from 420 to 300 lines):
diff --git a/sql/backends/monet5/datacell/Tests/emitter00.mal
b/sql/backends/monet5/datacell/Tests/emitter00.mal
--- a/sql/backends/monet5/datacell/Tests/emitter00.mal
+++ b/sql/backends/monet5/datacell/Tests/emitter00.mal
@@ -1,21 +1,25 @@
-#A test for the emitter
-#The emitter uses the prefix to lock the streams
-libary datacell;
-include basket;
-include emitter;
+#A single thread for a simple stream
+#The test is based on the definition the datacell baskets X
+# it starts a receptor in debug mode to see arrivals coming
+# from a (client) sensor.
-p1:= basket.new("X_p1",:bat[:lng,:int]);
-p2:= basket.new("X_p2",:bat[:lng,:int]);
-basket.group("X","X_p1","X_p2");
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
-t:= alarm.usec();
-bat.insert(p1,t,1);
-bat.insert(p2,t,1);
+sql.init();
-emitter.new("X");
-emitter.start("X","localhost",50001);
-io.print("emitter done");
+emitter.start("datacell","X","localhost",50502,"passive");
+io.print("emitter working");
alarm.sleep(5);
-emitter.drop("X");
+emitter.pause("datacell","X");
+io.print("emitter paused");
+alarm.sleep(5);
+emitter.resume("datacell","X");
+io.print("emitter restarted");
+alarm.sleep(5);
+emitter.pause("datacell","X");
+io.print("emitter stopped");
+emitter.drop("datacell","X");
+
+# The SQL equivalents
+# call emitter.start('datacell','X','localhost',50502,'passive');
+# call emitter.pause('datacell','X');
+# call emitter.resume('datacell','X');
+# call emitter.drop('datacell','X');
diff --git a/sql/backends/monet5/datacell/Tests/receptor00.mal
b/sql/backends/monet5/datacell/Tests/receptor00.mal
--- a/sql/backends/monet5/datacell/Tests/receptor00.mal
+++ b/sql/backends/monet5/datacell/Tests/receptor00.mal
@@ -5,7 +5,7 @@
sql.init();
-receptor.start("datacell","X","localhost",5001,"passive");
+receptor.start("datacell","X","localhost",50501,"passive");
io.print("receptor working");
alarm.sleep(5);
receptor.pause("datacell","X");
diff --git a/sql/backends/monet5/datacell/datacell.sql
b/sql/backends/monet5/datacell/datacell.sql
--- a/sql/backends/monet5/datacell/datacell.sql
+++ b/sql/backends/monet5/datacell/datacell.sql
@@ -31,3 +31,18 @@
create procedure receptor.drop (sch string, tbl string)
external name receptor.drop;
+-- Datacell emitter wrappers
+
+create schema emitter;
+create procedure emitter.start (sch string, tbl string, host string, port int,
protocol string)
+ external name emitter.start;
+
+create procedure emitter.pause (sch string, tbl string)
+ external name emitter.pause;
+
+create procedure emitter.resume (sch string, tbl string)
+ external name emitter.resume;
+
+create procedure emitter.drop (sch string, tbl string)
+ external name emitter.drop;
+
diff --git a/sql/backends/monet5/datacell/emitter.mx
b/sql/backends/monet5/datacell/emitter.mx
--- a/sql/backends/monet5/datacell/emitter.mx
+++ b/sql/backends/monet5/datacell/emitter.mx
@@ -21,9 +21,6 @@
This module is a prototype for the implementation of a
DataCell emitter. It can be used as follows.
@example
-pl:= basket.new("X_p1",:bat[:lng, :int]);
-emitter.new("Y");
-emitter.start("Y","localhost",50000);
@end example
After this call it will sent tuples from basket X_p1
to the stream Y at the localhost default port.
@@ -41,31 +38,24 @@
@mal
module emitter;
-command new(sch:str,tbl:str):void
+
+command start{unsafe}(schema:str, tbl:str, host:str, port:int, prot:str)
address DCemitterNew
-comment "Define a new emitter.";
+comment "Define a emitter based on a basket table.
+The emitter protocol is either active or passive. Return its handle";
-command setType(sch:str,tbl:str, type:str):void
-address DCemitterSetType
-comment "Set the emitter as a server or a client.";
+command pause(schema:str, tbl:str)
+address DCemitterPause
+comment "Pause listening";
-command start(sch:str,tbl:str)
-address DCemitterStart
-comment "Start an emitter thread";
-command start(sch:str,tbl:str, host:str, port:int)
-address DCemitterStartFull
-comment "Start an emitter thread";
+command resume(schema:str, tbl:str)
+address DCemitterResume
+comment "Resume a emitter thread";
-command pause(sch:str,tbl:str)
-address DCemitterPause
-comment "Pause sending";
-command stop(sch:str,tbl:str)
-address DCemitterStop
-comment "Stop an emitter thread";
+command drop(schema:str, tb:str)
+address DCemitterDrop
+comment "Drop a emitter";
-command drop(sch:str,tbl:str)
-address DCemitterDrop
-comment "Drop the emitter";
@-
@{
@+ Implementation
@@ -79,7 +69,7 @@
#include "mtime.h"
#include "basket.h"
-/* #define _DEBUG_EMITTER_ */
+#define _DEBUG_EMITTER_
#define EMout GDKout
#ifdef WIN32
@@ -92,13 +82,11 @@
#define adapters_export extern
#endif
-adapters_export str DCemitterNew(int *ret, str *schema, str *grp);
-adapters_export str DCemitterSetType(int *ret, str *schema, str *grp, str
*type);
-adapters_export str DCemitterStart(int *ret, str *schema, str *nme);
-adapters_export str DCemitterStartFull(int *ret, str *schema, str *nme, str
*host, int *port);
-adapters_export str DCemitterStop(int *ret, str *schema, str *nme);
+adapters_export str DCemitterNew(int *ret, str *schema, str *tbl, str *host,
int *port, str *proto);
adapters_export str DCemitterPause(int *ret, str *schema, str *nme);
+adapters_export str DCemitterResume(int *ret, str *schema, str *nme);
adapters_export str DCemitterDrop(int *ret, str *schema, str *nme);
+
#endif
@c
@@ -113,6 +101,9 @@
#define EMDROP 5
#define EMERROR 8 /* failed to establish the stream */
+#define EMPASSIVE 1
+#define EMACTIVE 2
+
#define TCP 1
#define UDP 2
static int protocol= TCP;
@@ -125,7 +116,7 @@
str host;
int port;
int status;
- int server;/* control the delay between attempts to connect */
+ int protocol;/* control the delay between attempts to connect */
int delay;
int lck;
SOCKET sockfd;
@@ -170,28 +161,41 @@
The baskets should already be defined. There order
is used to interpret the messages sent.
@c
-str DCemitterNew(int *ret, str *schema, str *grp)
+str DCemitterNew(int *ret, str *schema, str *tbl, str *host, int *port, str
*proto)
{
Emitter em;
- int idx, i, j, len;
+ int protocol, idx, i, j, len;
BAT *b;
- if (EMfind(*schema, *grp))
+ if (EMfind(*schema, *tbl))
throw(MAL, "emitter.new", "Duplicate emitter");
- em = EMnew(*schema, *grp);
- em->host = GDKstrdup("localhost");
- em->port = 50001;
- em->delay = PAUSEDEFAULT;
- em->lck = 0;
- em->error = NULL;
- em->server = 1;
@-
All tables are prepended with a default tick bat.
It becomes the synchronization handle.
@c
- len = DCmemberCount(*schema, *grp);
+ len = DCmemberCount(*schema, *tbl);
if (len == 0)
- throw(MAL, "receptor.new", "Group has no members");
+ throw(MAL, "emitter.new", "Group has no members");
+
+ idx = DClocate(*schema, *tbl);
+ if (idx == 0)
+ throw(MAL, "emitter.new", "basket not found");
+
+ if ( strcmp(*proto, "active") == 0)
+ protocol = EMACTIVE;
+ else
+ if ( strcmp(*proto, "passive") == 0)
+ protocol = EMPASSIVE;
+ else
+ throw(MAL, "emitter.new", "Illegal protocol");
+
+ em = EMnew(*schema, *tbl);
+ em->host = GDKstrdup(*host);
+ em->port = *port;
+ em->delay = PAUSEDEFAULT;
+ em->lck = 0;
+ em->error = NULL;
+ em->protocol = protocol;
em->table.format = GDKzalloc(sizeof(Column) * (len + 1));
em->table.format[0].c[0] = NULL;
@@ -200,10 +204,6 @@
em->table.format[0].seplen = (int)strlen(em->table.format[0].sep);
em->status = EMSTOP;
- idx = DClocate(*schema, *grp);
- if (idx == 0)
- throw(MAL, "receptor.new", "basket not found");
-
for (j = 0, i = 0; i < baskets[idx].colcount; i++) {
b = baskets[idx].primary[j];
if (b == NULL) {
@@ -234,41 +234,11 @@
#ifdef _DEBUG_EMITTER_
mnstr_printf(EMout, "Instantiate a new emitter %d fields\n", i);
#endif
- return MAL_SUCCEED;
-}
-
-str DCemitterSetType(int *ret, str *schema, str *nme, str *type)
-{
- Emitter em;
-
+ em->status = EMLISTEN;
(void)ret;
- em = EMfind(*schema, *nme);
- if (em == NULL)
- throw(MAL, "emitter.setType", "Emitter not defined");
- if (strcmp(*type, "server") == 0) {
- em->server = 1;
- return MAL_SUCCEED;
+ if ( MT_create_thread(&em->pid, (void (*)(void *))EMstartThread, em,
MT_THR_DETACHED) != 0) {
+ throw(MAL, "emitter.start", "Emitter initiation failed");
}
- if (strcmp(*type, "client") == 0) {
- em->server = 0;
- return MAL_SUCCEED;
- }
- throw(MAL, "emitter.setType", "The type should be <server> or
<client>.");
-}
-
-str DCemitterStop(int *ret, str *schema, str *nme)
-{
- Emitter em;
-
- em = EMfind(*schema, *nme);
- if (em == NULL)
- throw(MAL, "emitter.stop", "Emitter not defined");
- em->status = EMSTOP;
-
-#ifdef _DEBUG_EMITTER_
- mnstr_printf(EMout, "Stop a new emitter\n");
-#endif
- (void)ret;
return MAL_SUCCEED;
}
@@ -291,55 +261,19 @@
return MAL_SUCCEED;
}
-str DCemitterStart(int *ret, str *schema, str *nme)
+str DCemitterResume(int *ret, str *schema, str *nme)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list