Changeset: 62e26fc5a062 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=62e26fc5a062
Modified Files:
sql/backends/monet5/datacell/datacell.c
sql/backends/monet5/datacell/datacell.h
sql/backends/monet5/datacell/datacell.mal
sql/backends/monet5/datacell/emitter.c
sql/backends/monet5/datacell/emitter.h
sql/backends/monet5/datacell/receptor.c
sql/backends/monet5/datacell/receptor.h
Branch: default
Log Message:
Reduce interface
Extended properties are passed along with receptor/emitter
diffs (truncated from 377 to 300 lines):
diff --git a/sql/backends/monet5/datacell/datacell.c
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -23,6 +23,8 @@
#include "monetdb_config.h"
#include "datacell.h"
+#include "receptor.h"
+#include "emitter.h"
#include "opt_datacell.h"
#include "sql_optimizer.h"
#include "sql_gencode.h"
@@ -134,8 +136,38 @@ DCreceptor(Client cntxt, MalBlkPtr mb, M
str *host = (str *) getArgReference(stk, pci, 2);
int *port = (int *) getArgReference(stk, pci, 3);
int idx = BSKTlocate(*tbl);
+ str *protocol;
+ str *mode;
+ Receptor rc;
+
if (idx == 0)
BSKTregister(cntxt, mb, stk, pci);
+ rc = RCfind(*tbl);
+ if ( pci->argc == 6 && rc != NULL ){
+ protocol = (str *) getArgReference(stk, pci, 4);
+ if ( strcmp("tcp", *protocol) == 0)
+ rc->protocol = TCP;
+ else
+ if ( strcmp("TCP", *protocol) == 0)
+ rc->protocol = TCP;
+ else
+ if ( strcmp("udp", *protocol) == 0)
+ rc->protocol = TCP;
+ else
+ if ( strcmp("UDP", *protocol) == 0)
+ rc->protocol = TCP;
+ else
+ throw(SQL,"datacell.register","Illegal protocol");
+
+ mode = (str *) getArgReference(stk, pci, 5);
+ if ( strcmp("active", *mode) == 0)
+ rc->mode = BSKTACTIVE;
+ else
+ if ( strcmp("passive", *mode) == 0)
+ rc->mode = BSKTPASSIVE;
+ else
+ throw(SQL,"datacell.register","Illegal mode");
+ }
return RCreceptorStart(ret, tbl, host, port);
}
@@ -147,18 +179,41 @@ DCemitter(Client cntxt, MalBlkPtr mb, Ma
str *host = (str *) getArgReference(stk, pci, 2);
int *port = (int *) getArgReference(stk, pci, 3);
int idx = BSKTlocate(*tbl);
+ Emitter em;
+ str *protocol, *mode;
+
if (idx == 0)
BSKTregister(cntxt, mb, stk, pci);
+ em = EMfind(*tbl);
+ if ( pci->argc == 6 && em != NULL ){
+ protocol = (str *) getArgReference(stk, pci, 4);
+ if ( strcmp("tcp", *protocol) == 0)
+ em->protocol = TCP;
+ else
+ if ( strcmp("TCP", *protocol) == 0)
+ em->protocol = TCP;
+ else
+ if ( strcmp("udp", *protocol) == 0)
+ em->protocol = TCP;
+ else
+ if ( strcmp("UDP", *protocol) == 0)
+ em->protocol = TCP;
+ else
+ throw(SQL,"datacell.register","Illegal protocol");
+
+ mode = (str *) getArgReference(stk, pci, 5);
+ if ( strcmp("active", *mode) == 0)
+ em->mode = BSKTACTIVE;
+ else
+ if ( strcmp("passive", *mode) == 0)
+ em->mode = BSKTPASSIVE;
+ else
+ throw(SQL,"datacell.register","Illegal mode");
+ }
return EMemitterStart(ret, tbl, host, port);
}
str
-DCregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- return BSKTregister(cntxt, mb, stk, pci);
-}
-
-str
DCpauseObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
int idx, ret = 0;
@@ -329,6 +384,19 @@ DCpauseScheduler(Client cntxt, MalBlkPtr
}
str
+DCstopScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int ret = 0;
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+ RCstop(&ret);
+ EMstop(&ret);
+ return PNstopScheduler(&ret);
+}
+
+str
DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
int ret = 0;
diff --git a/sql/backends/monet5/datacell/datacell.h
b/sql/backends/monet5/datacell/datacell.h
--- a/sql/backends/monet5/datacell/datacell.h
+++ b/sql/backends/monet5/datacell/datacell.h
@@ -45,7 +45,7 @@
datacell_export str DCprelude(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCinitialize(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCreceptor(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
-datacell_export str DCregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+datacell_export str DCemitter(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCpauseObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCresumeObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCstopObject(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
@@ -58,8 +58,8 @@ datacell_export str DCbeat(int *ret, str
datacell_export str DCpauseScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
datacell_export str DCresumeScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+datacell_export str DCstopScheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
datacell_export str DCpostlude(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
-datacell_export str DCemitter(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif
diff --git a/sql/backends/monet5/datacell/datacell.mal
b/sql/backends/monet5/datacell/datacell.mal
--- a/sql/backends/monet5/datacell/datacell.mal
+++ b/sql/backends/monet5/datacell/datacell.mal
@@ -17,11 +17,11 @@
module datacell;
-pattern basket(tab:str):void
-address DCregister
-comment "Initialize a new basket based on a specific table definition in the
datacell schema");
+pattern emitter(tbl:str, host:str, port:int)
+address DCemitter
+comment "Define a emitter based on a basket table.";
-pattern emitter(tbl:str, host:str, port:int)
+pattern emitter(tbl:str, host:str, port:int, proto:str, mode:str)
address DCemitter
comment "Define a emitter based on a basket table.";
@@ -29,13 +29,9 @@ pattern receptor(tbl:str, host:str, port
address DCreceptor
comment "Define a receptor based on a basket table..";
-pattern mode(tbl:str, arg:str)
-address DCmode
-comment "Change the mode of the adaptor to active/passive";
-
-pattern protocol(tbl:str, arg:str)
-address DCprotocol
-comment "Change the protocol of the adaptor UDP/TCP/CSV";
+pattern receptor(tbl:str, host:str, port:int, proto:str, mode:str)
+address DCreceptor
+comment "Define a receptor based on a basket table..";
pattern pause(obj:str):void
address DCpauseObject
@@ -67,11 +63,15 @@ comment "Convert the datacell schema to
pattern pause()
address DCpauseScheduler
-comment "(Re)start the petrinet scheduler.";
+comment "Pause all queries";
pattern resume()
address DCresumeScheduler
-comment "Resume the petrinet scheduler.";
+comment "Resume all queries.";
+
+pattern stop():void
+address DCstopScheduler
+comment "Stop all receptors, emitters and queries";
pattern postlude()
address DCpostlude
diff --git a/sql/backends/monet5/datacell/emitter.c
b/sql/backends/monet5/datacell/emitter.c
--- a/sql/backends/monet5/datacell/emitter.c
+++ b/sql/backends/monet5/datacell/emitter.c
@@ -37,30 +37,6 @@
#define _DEBUG_EMITTER_
-typedef struct EMITTER {
- str name;
- str host;
- int port;
- int mode; /* active/passive */
- int protocol; /* event protocol UDP,TCP,CSV */
- int bskt; /* connected to a basket */
- int status;
- int delay; /* control the delay between attempts to
connect */
- int lck;
- SOCKET sockfd;
- SOCKET newsockfd;
- stream *emitter;
- str error;
- MT_Id pid;
- /* statistics */
- timestamp lastseen;
- int cycles; /* how often emptied */
- int pending; /* pending events */
- int sent;
- Tablet table;
- struct EMITTER *nxt, *prv;
-} EMrecord, *Emitter;
-
static Emitter emAnchor = NULL;
static str EMstartThread(Emitter em);
@@ -80,7 +56,7 @@ EMnew(str nme)
return em;
}
-static Emitter
+Emitter
EMfind(str nme)
{
Emitter r;
diff --git a/sql/backends/monet5/datacell/emitter.h
b/sql/backends/monet5/datacell/emitter.h
--- a/sql/backends/monet5/datacell/emitter.h
+++ b/sql/backends/monet5/datacell/emitter.h
@@ -27,6 +27,30 @@
/* #define _DEBUG_EMITTER_ */
#define EMout GDKout
+typedef struct EMITTER {
+ str name;
+ str host;
+ int port;
+ int mode; /* active/passive */
+ int protocol; /* event protocol UDP,TCP,CSV */
+ int bskt; /* connected to a basket */
+ int status;
+ int delay; /* control the delay between attempts to
connect */
+ int lck;
+ SOCKET sockfd;
+ SOCKET newsockfd;
+ stream *emitter;
+ str error;
+ MT_Id pid;
+ /* statistics */
+ timestamp lastseen;
+ int cycles; /* how often emptied */
+ int pending; /* pending events */
+ int sent;
+ Tablet table;
+ struct EMITTER *nxt, *prv;
+} EMrecord, *Emitter;
+
#ifdef WIN32
#ifndef LIBDATACELL
#define adapters_export extern __declspec(dllimport)
@@ -41,6 +65,7 @@ adapters_export str EMemitterStart(int *
adapters_export str EMemitterPause(int *ret, str *nme);
adapters_export str EMemitterResume(int *ret, str *nme);
adapters_export str EMemitterStop(int *ret, str *nme);
+adapters_export Emitter EMfind(str nme);
adapters_export str EMpause(int *ret);
adapters_export str EMresume(int *ret);
adapters_export str EMstop(int *ret);
diff --git a/sql/backends/monet5/datacell/receptor.c
b/sql/backends/monet5/datacell/receptor.c
--- a/sql/backends/monet5/datacell/receptor.c
+++ b/sql/backends/monet5/datacell/receptor.c
@@ -47,32 +47,6 @@
#define RCHOST "localhost"
#define RCPORT 55000
-typedef struct RECEPTOR {
- str name;
- str host;
- int port;
- int mode; /* active/passive */
- int protocol; /* event protocol UDP,TCP,CSV */
- int bskt; /* connected to a basket */
- int status;
- int delay; /* control the delay between attempts to connect */
- int lck;
- str scenario; /* use a scenario file */
- int sequence; /* repetition count */
- str modnme, fcnnme; /* generic receptor generators */
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list