Changeset: 62e26fc5a062 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=62e26fc5a062
Modified Files:
        sql/backends/monet5/datacell/datacell.c
        sql/backends/monet5/datacell/datacell.h
        sql/backends/monet5/datacell/datacell.mal
        sql/backends/monet5/datacell/emitter.c
        sql/backends/monet5/datacell/emitter.h
        sql/backends/monet5/datacell/receptor.c
        sql/backends/monet5/datacell/receptor.h
Branch: default
Log Message:

Reduce interface
Extended properties are passed along with receptor/emitter


diffs (truncated from 377 to 300 lines):

diff --git a/sql/backends/monet5/datacell/datacell.c 
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -23,6 +23,8 @@
 
 #include "monetdb_config.h"
 #include "datacell.h"
+#include "receptor.h"
+#include "emitter.h"
 #include "opt_datacell.h"
 #include "sql_optimizer.h"
 #include "sql_gencode.h"
@@ -134,8 +136,38 @@ DCreceptor(Client cntxt, MalBlkPtr mb, M
        str *host = (str *) getArgReference(stk, pci, 2);
        int *port = (int *) getArgReference(stk, pci, 3);
        int idx = BSKTlocate(*tbl);
+       str *protocol;
+       str *mode;
+       Receptor rc;
+       
        if (idx == 0)
                BSKTregister(cntxt, mb, stk, pci);
+       rc = RCfind(*tbl);
+       if ( pci->argc == 6 && rc != NULL ){
+               protocol = (str *) getArgReference(stk, pci, 4);
+               if ( strcmp("tcp", *protocol) == 0)
+                       rc->protocol = TCP;
+               else
+               if ( strcmp("TCP", *protocol) == 0)
+                       rc->protocol = TCP;
+               else
+               if ( strcmp("udp", *protocol) == 0)
+                       rc->protocol = TCP;
+               else
+               if ( strcmp("UDP", *protocol) == 0)
+                       rc->protocol = TCP;
+               else
+                       throw(SQL,"datacell.register","Illegal protocol");
+
+               mode = (str *) getArgReference(stk, pci, 5);
+               if ( strcmp("active", *mode) == 0)
+                       rc->mode = BSKTACTIVE;
+               else
+               if ( strcmp("passive", *mode) == 0)
+                       rc->mode = BSKTPASSIVE;
+               else
+                       throw(SQL,"datacell.register","Illegal mode");
+       }
        return RCreceptorStart(ret, tbl, host, port);
 }
 
@@ -147,18 +179,41 @@ DCemitter(Client cntxt, MalBlkPtr mb, Ma
        str *host = (str *) getArgReference(stk, pci, 2);
        int *port = (int *) getArgReference(stk, pci, 3);
        int idx = BSKTlocate(*tbl);
+       Emitter em;
+       str *protocol, *mode;
+
        if (idx == 0)
                BSKTregister(cntxt, mb, stk, pci);
+       em = EMfind(*tbl);
+       if ( pci->argc == 6 && em != NULL ){
+               protocol = (str *) getArgReference(stk, pci, 4);
+               if ( strcmp("tcp", *protocol) == 0)
+                       em->protocol = TCP;
+               else
+               if ( strcmp("TCP", *protocol) == 0)
+                       em->protocol = TCP;
+               else
+               if ( strcmp("udp", *protocol) == 0)
+                       em->protocol = TCP;
+               else
+               if ( strcmp("UDP", *protocol) == 0)
+                       em->protocol = TCP;
+               else
+                       throw(SQL,"datacell.register","Illegal protocol");
+
+               mode = (str *) getArgReference(stk, pci, 5);
+               if ( strcmp("active", *mode) == 0)
+                       em->mode = BSKTACTIVE;
+               else
+               if ( strcmp("passive", *mode) == 0)
+                       em->mode = BSKTPASSIVE;
+               else
+                       throw(SQL,"datacell.register","Illegal mode");
+       }
        return EMemitterStart(ret, tbl, host, port);
 }
 
 str
-DCregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
-       return BSKTregister(cntxt, mb, stk, pci);
-}
-
-str
 DCpauseObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        int idx, ret = 0;
@@ -329,6 +384,19 @@ DCpauseScheduler(Client cntxt, MalBlkPtr
 }
 
 str
+DCstopScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int ret = 0;
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+       RCstop(&ret);
+       EMstop(&ret);
+       return PNstopScheduler(&ret);
+}
+
+str
 DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        int ret = 0;
diff --git a/sql/backends/monet5/datacell/datacell.h 
b/sql/backends/monet5/datacell/datacell.h
--- a/sql/backends/monet5/datacell/datacell.h
+++ b/sql/backends/monet5/datacell/datacell.h
@@ -45,7 +45,7 @@
 datacell_export str DCprelude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCinitialize(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCreceptor(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
-datacell_export str DCregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+datacell_export str DCemitter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCpauseObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCresumeObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCstopObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
@@ -58,8 +58,8 @@ datacell_export str DCbeat(int *ret, str
 
 datacell_export str DCpauseScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
 datacell_export str DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
+datacell_export str DCstopScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 datacell_export str DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
-datacell_export str DCemitter(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
 #endif
diff --git a/sql/backends/monet5/datacell/datacell.mal 
b/sql/backends/monet5/datacell/datacell.mal
--- a/sql/backends/monet5/datacell/datacell.mal
+++ b/sql/backends/monet5/datacell/datacell.mal
@@ -17,11 +17,11 @@
 
 module datacell;
 
-pattern basket(tab:str):void
-address DCregister
-comment "Initialize a new basket based on a specific table definition in the 
datacell schema");
+pattern emitter(tbl:str, host:str, port:int)
+address DCemitter
+comment "Define a emitter based on a basket table.";
 
-pattern emitter(tbl:str, host:str, port:int)
+pattern emitter(tbl:str, host:str, port:int, proto:str, mode:str)
 address DCemitter
 comment "Define a emitter based on a basket table.";
 
@@ -29,13 +29,9 @@ pattern receptor(tbl:str, host:str, port
 address DCreceptor
 comment "Define a receptor based on a basket table..";
 
-pattern mode(tbl:str, arg:str)
-address DCmode
-comment "Change the mode of the adaptor to active/passive";
-
-pattern protocol(tbl:str, arg:str)
-address DCprotocol
-comment "Change the protocol of the adaptor UDP/TCP/CSV";
+pattern receptor(tbl:str, host:str, port:int, proto:str, mode:str)
+address DCreceptor
+comment "Define a receptor based on a basket table..";
 
 pattern pause(obj:str):void
 address DCpauseObject
@@ -67,11 +63,15 @@ comment "Convert the datacell schema to 
 
 pattern pause()
 address DCpauseScheduler
-comment "(Re)start the petrinet scheduler.";
+comment "Pause all queries";
 
 pattern resume()
 address DCresumeScheduler
-comment "Resume the petrinet scheduler.";
+comment "Resume all queries.";
+
+pattern stop():void
+address DCstopScheduler
+comment "Stop all receptors, emitters and queries";
 
 pattern postlude()
 address DCpostlude
diff --git a/sql/backends/monet5/datacell/emitter.c 
b/sql/backends/monet5/datacell/emitter.c
--- a/sql/backends/monet5/datacell/emitter.c
+++ b/sql/backends/monet5/datacell/emitter.c
@@ -37,30 +37,6 @@
 
 #define _DEBUG_EMITTER_
 
-typedef struct EMITTER {
-       str name;
-       str host;
-       int port;
-       int mode;       /* active/passive */
-       int protocol;   /* event protocol UDP,TCP,CSV */
-       int bskt;       /* connected to a basket */
-       int status;
-       int delay;              /* control the delay between attempts to 
connect */
-       int lck;
-       SOCKET sockfd;
-       SOCKET newsockfd;
-       stream *emitter;
-       str error;
-       MT_Id pid;
-       /* statistics */
-       timestamp lastseen;
-       int cycles;             /* how often emptied */
-       int pending;            /* pending events */
-       int sent;
-       Tablet table;
-       struct EMITTER *nxt, *prv;
-} EMrecord, *Emitter;
-
 static Emitter emAnchor = NULL;
 
 static str EMstartThread(Emitter em);
@@ -80,7 +56,7 @@ EMnew(str nme)
        return em;
 }
 
-static Emitter
+Emitter
 EMfind(str nme)
 {
        Emitter r;
diff --git a/sql/backends/monet5/datacell/emitter.h 
b/sql/backends/monet5/datacell/emitter.h
--- a/sql/backends/monet5/datacell/emitter.h
+++ b/sql/backends/monet5/datacell/emitter.h
@@ -27,6 +27,30 @@
 /* #define _DEBUG_EMITTER_ */
 #define EMout GDKout
 
+typedef struct EMITTER {
+       str name;
+       str host;
+       int port;
+       int mode;       /* active/passive */
+       int protocol;   /* event protocol UDP,TCP,CSV */
+       int bskt;       /* connected to a basket */
+       int status;
+       int delay;              /* control the delay between attempts to 
connect */
+       int lck;
+       SOCKET sockfd;
+       SOCKET newsockfd;
+       stream *emitter;
+       str error;
+       MT_Id pid;
+       /* statistics */
+       timestamp lastseen;
+       int cycles;             /* how often emptied */
+       int pending;            /* pending events */
+       int sent;
+       Tablet table;
+       struct EMITTER *nxt, *prv;
+} EMrecord, *Emitter;
+
 #ifdef WIN32
 #ifndef LIBDATACELL
 #define adapters_export extern __declspec(dllimport)
@@ -41,6 +65,7 @@ adapters_export str EMemitterStart(int *
 adapters_export str EMemitterPause(int *ret, str *nme);
 adapters_export str EMemitterResume(int *ret, str *nme);
 adapters_export str EMemitterStop(int *ret, str *nme);
+adapters_export Emitter EMfind(str nme);
 adapters_export str EMpause(int *ret);
 adapters_export str EMresume(int *ret);
 adapters_export str EMstop(int *ret);
diff --git a/sql/backends/monet5/datacell/receptor.c 
b/sql/backends/monet5/datacell/receptor.c
--- a/sql/backends/monet5/datacell/receptor.c
+++ b/sql/backends/monet5/datacell/receptor.c
@@ -47,32 +47,6 @@
 #define RCHOST "localhost"
 #define RCPORT 55000
 
-typedef struct RECEPTOR {
-       str name;
-       str host;
-       int port;
-       int mode;       /* active/passive */
-       int protocol;   /* event protocol UDP,TCP,CSV */
-       int bskt;       /* connected to a basket */
-       int status;
-       int delay;      /* control the delay between attempts to connect */
-       int lck;
-       str scenario;   /* use a scenario file */
-       int sequence;   /* repetition count */
-       str modnme, fcnnme; /* generic receptor generators */
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to