Changeset: ae4e6e2095ec for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae4e6e2095ec
Modified Files:
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:
Add a private MAL client per continous query
always make the errors table accessiblg
diffs (246 lines):
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -94,9 +94,9 @@ create function iot.outputs()
returns table( "s" string, "t" string, "sch" string, "qry" string)
external name iot.outputplaces;
--- create function iot.errors()
--- returns table( "schema" string, "table" string, error string)
--- external name iot.errors;
+create function iot.errors()
+returns table( "table" string, error string)
+external name iot.errors;
-- tables for iotwebserver
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -74,8 +74,8 @@ BSKTclean(int idx)
baskets[idx].table_name = NULL;
BBPreclaim(baskets[idx].errors);
+ baskets[idx].errors = NULL;
baskets[idx].winstride = -1;
- baskets[idx].errors = NULL;
baskets[idx].count = 0;
}
for(idx = 1; idx < bsktTop; idx++){
@@ -85,8 +85,8 @@ BSKTclean(int idx)
baskets[idx].table_name = NULL;
BBPreclaim(baskets[idx].errors);
+ baskets[idx].errors = NULL;
baskets[idx].winstride = -1;
- baskets[idx].errors = NULL;
baskets[idx].count = 0;
}
}
@@ -607,15 +607,11 @@ BSKTdump(void *ret)
int bskt;
BUN cnt;
BAT *b;
- mvc *m = NULL;
str msg = MAL_SUCCEED;
mnstr_printf(GDKout, "#baskets table\n");
for (bskt = 1; bskt < bsktLimit; bskt++)
if (baskets[bskt].table_name) {
- msg = getSQLContext(mal_clients, 0, &m, NULL);
- if ( msg != MAL_SUCCEED)
- break;
cnt = 0;
b = baskets[bskt].bats[0];
if( b)
@@ -684,6 +680,48 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
}
str
+BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int *res = getArgReference_int(stk, pci, 0);
+ str sname = *getArgReference_str(stk, pci, 2);
+ str tname = *getArgReference_str(stk, pci, 3);
+ str cname = *getArgReference_str(stk, pci, 4);
+ bat rows = *getArgReference_bat(stk, pci, 5);
+ bat val = *getArgReference_bat(stk, pci, 6);
+ BAT *bn=0, *rid=0, *bval = 0;
+ int bskt;
+
+ return 0;
+ (void) cntxt;
+ (void) mb;
+ *res = 0;
+
+ rid = BATdescriptor(rows);
+ if( rid == NULL)
+ throw(SQL, "basket.append", "Cannot access source oid descriptor");
+ bval = BATdescriptor(val);
+ if( bval == NULL){
+ BBPunfix(rid->batCacheid);
+ throw(SQL, "basket.append", "Cannot access source descriptor");
+ }
+
+ bskt = BSKTlocate(sname,tname);
+ if( bskt == 0)
+ throw(SQL, "basket.append", "Cannot access basket descriptor
%s.%s",sname,tname);
+ bn = BSKTbindColumn(sname,tname,cname);
+
+ if( bn){
+ void_replace_bat(bn, rid, bval, TRUE);
+ BATderiveProps(bn, FALSE);
+ } else throw(SQL, "basket.append", "Cannot access target column
%s.%s.%s",sname,tname,cname);
+
+ baskets[bskt].status = BSKTFILLED;
+ BBPunfix(rid->batCacheid);
+ BBPunfix(bval->batCacheid);
+ return MAL_SUCCEED;
+}
+
+str
BSKTreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
lng *res = getArgReference_lng(stk, pci, 0);
@@ -894,7 +932,7 @@ BSKTtableerrors(bat *nameId, bat *errorI
}
for (i = 1; i < bsktTop; i++)
- if (BATcount(baskets[i].errors) > 0) {
+ if (baskets[i].errors && BATcount(baskets[i].errors) > 0) {
bi = bat_iterator(baskets[i].errors);
BATloop(baskets[i].errors, p, q)
{
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -89,6 +89,7 @@ iot_export int BSKTlocate(str sch, str t
iot_export int BSKTunlocate(str sch, str tbl);
iot_export int BSKTlocate(str sch, str tbl);
iot_export str BSKTappend(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTimportInternal(Client cntxt, int bskt);
iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -171,12 +171,20 @@ static void
IOTreceptorThread(void *dummy)
{
int idx = *(int*)dummy;
+ Client cntxt = MCinitClient(0, mal_clients[0].fdin,
mal_clients[0].fdout);
+
+ if( cntxt == NULL)
+ return;
+ //SQLinitClient(cntxt);
_DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n",
baskets[idx].schema_name,
baskets[idx].table_name,
baskets[idx].source);
/* continously scan the container for baskets */
- BSKTimportInternal(mal_clients, idx);
+ BSKTimportInternal(cntxt, idx);
+ _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the
file\n",
+ baskets[idx].schema_name,
+ baskets[idx].table_name);
}
str
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -62,6 +62,7 @@ typedef struct {
str fcnname;
MalBlkPtr mb; /* Query block */
MalStkPtr stk; /* might be handy */
+ Client client; /* MAL client context for this query */
int status; /* query status waiting/running/paused */
int enabled; /* all baskets are available */
@@ -164,6 +165,13 @@ PNregisterInternal(Client cntxt, MalBlkP
pnet[pnettop].mb = nmb;
pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
+ pnet[pnettop].client = MCinitClient(0,0,0);
+ if ( pnet[pnettop].client == NULL)
+ throw(MAL,"petrinet.register","Failed to create client record
for continous query");
+ msg = SQLinitClient(pnet[pnettop].client);
+ if( msg)
+ return msg;
+
pnet[pnettop].status = PNWAIT;
pnet[pnettop].cycles = 0;
pnet[pnettop].seen = *timestamp_nil;
@@ -238,12 +246,15 @@ PNstop(void){
int i,cnt;
_DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n");
- pnstatus = PNSTOP;
+ pnstatus = PNSTOP; // avoid starting new continuous queries
+ for(cnt=0, i = 0; i < pnettop; i++)
+ if( pnet[i].client )
+ pnet[i].client->itrace ='x';
+
do{
MT_sleep_ms(20);
- for(cnt=0, i = 0; i < pnettop; i++){
- cnt += pnet[i].status == PNRUNNING;
- }
+ for(cnt=0, i = 0; i < pnettop; i++)
+ cnt += pnet[i].status != PNWAIT;
} while(cnt);
BSKTclean(0);
_DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n");
@@ -271,6 +282,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
}
GDKfree(pnet[i].modname);
GDKfree(pnet[i].fcnname);
+ MCcloseClient(pnet[i].client);
for( ; i <pnettop-1;i++)
pnet[i]= pnet[i+1];
memset((void*) (pnet+i), 0, sizeof(PNnode));
@@ -282,6 +294,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
for ( i = 0; i < pnettop; i++){
GDKfree(pnet[i].modname);
GDKfree(pnet[i].fcnname);
+ MCcloseClient(pnet[i].client);
memset((void*) (pnet+i), 0, sizeof(PNnode));
}
pnettop = 0;
@@ -395,7 +408,7 @@ PNexecute( void *n)
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
locked\n",node->modname, node->fcnname);
- msg = runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
+ msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0);
_DEBUG_PETRINET_
mnstr_printf(PNout, "#petrinet.execute %s.%s transition
done:%s\n",
@@ -425,8 +438,16 @@ PNscheduler(void *dummy)
timestamp ts, tn;
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
- cntxt = mal_clients; /* run as admin in SQL mode*/
- if( strcmp(cntxt->scenario, "sql") )
+ cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/
+ if( cntxt){
+ if( SQLinitClient(cntxt) != MAL_SUCCEED)
+ GDKerror("Could not initialize PNscheduler");
+ }else{
+ GDKerror("Could not initialize PNscheduler");
+ return;
+ }
+
+ if( cntxt->scenario == NULL || strcmp(cntxt->scenario, "sql") )
SQLinitEnvironment(cntxt, NULL, NULL, NULL);
pnstatus = PNRUNNING; // global state
@@ -545,6 +566,7 @@ PNscheduler(void *dummy)
}
}
+ MCcloseClient(cntxt);
pnstatus = PNINIT;
_DEBUG_PETRINET_ mnstr_flush(PNout);
(void) dummy;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list