Changeset: f0ff30527e5f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f0ff30527e5f
Modified Files:
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
Branch: trails
Log Message:
More error scenarios handling.
diffs (truncated from 302 to 300 lines):
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -394,7 +394,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
msg =
createException(MAL,"cquery.analysis",SQLSTATE(3F000) "Too many stream table
columns\n");
} else if((msg =
BSKTregisterInternal(cntxt,mb,"tmp",alias,&bskt)) == MAL_SUCCEED) {
pnet[idx].baskets[j] = bskt;
- pnet[idx].inout[j] = STREAM_OUT;
+ pnet[idx].inout[j] = CQ_OUT;
}
}
return msg;
@@ -524,8 +524,8 @@ CQregister(Client cntxt, str sname, str
msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already
exists\n", ralias);
goto revertids;
}
- if((t = mvc_create_stream_table(m, tmp_schema,
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE, CA_COMMIT,
-
-1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+ if((t = mvc_create_stream_table(m, tmp_schema,
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE,
+
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE))
== NULL) {
msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal
stream table\n");
goto revertids;
}
@@ -536,7 +536,7 @@ CQregister(Client cntxt, str sname, str
goto revertids;
}
}
- msg = create_table_or_view(m, "tmp", ralias, t,
tt_stream_temp);
+ msg = create_table_or_view(m, "tmp", ralias, t,
SQL_LOCAL_TEMP_STREAM);
//msg = sql_grant_table_privs(m, "public",
PRIV_SELECT, "tmp", ralias, NULL, 0, USER_MONETDB);
}
revertids:
@@ -736,7 +736,7 @@ CQregister(Client cntxt, str sname, str
}
if(heartbeats != HEARTBEAT_NIL) {
for(i=0; i < MAXSTREAMS && pnet[pnettop].baskets[i]; i++) {
- if(baskets[pnet[pnettop].baskets[i]].window !=
DEFAULT_TABLE_WINDOW) {
+ if(pnet[idx].inout[i] == STREAM_IN &&
baskets[pnet[pnettop].baskets[i]].window != DEFAULT_TABLE_WINDOW) {
msg = createException(SQL, "cquery.register",
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
cleanBaskets(pnettop);
@@ -817,7 +817,7 @@ CQresume(str alias, int with_alter, lng
}
if(with_alter && heartbeats != HEARTBEAT_NIL) {
for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) {
- if(baskets[pnet[idx].baskets[j]].window !=
DEFAULT_TABLE_WINDOW) {
+ if(pnet[idx].inout[j] == STREAM_IN &&
baskets[pnet[pnettop].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
msg = createException(SQL, "cquery.resume",
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
goto unlock;
@@ -826,6 +826,10 @@ CQresume(str alias, int with_alter, lng
}
pnet[idx].status = CQWAIT;
+ if(pnet[idx].error) { //if there was an error registered, delete it
+ GDKfree(pnet[idx].error);
+ pnet[idx].error = MAL_SUCCEED;
+ }
if(with_alter) {
pnet[idx].cycles = cycles;
pnet[idx].beats = SET_HEARTBEATS(heartbeats);
@@ -856,8 +860,13 @@ CQresumeAll(void)
#endif
MT_lock_set(&ttrLock);
- for(i = 0 ; i < pnettop; i++)
+ for(i = 0 ; i < pnettop; i++) {
pnet[i].status = CQWAIT;
+ if(pnet[i].error) {
+ GDKfree(pnet[i].error);
+ pnet[i].error = MAL_SUCCEED;
+ }
+ }
/* start the scheduler if needed */
if(cq_pid == 0) {
@@ -1053,7 +1062,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb,
for( ; idx < last; idx++){
there_is_window_constraint = 0;
for(j=0; j < MAXSTREAMS && !there_is_window_constraint &&
pnet[idx].baskets[j]; j++) {
- if(baskets[pnet[idx].baskets[j]].window !=
DEFAULT_TABLE_WINDOW) {
+ if(pnet[idx].inout[j] == STREAM_IN &&
baskets[pnet[idx].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
there_is_window_constraint = 1;
}
}
@@ -1097,8 +1106,8 @@ CQwait(Client cntxt, MalBlkPtr mb, MalSt
str
CQderegister(Client cntxt, str alias)
{
- int idx = 0;
- str msg = MAL_SUCCEED, this_alias = NULL;
+ int idx = 0, i, j;
+ str msg = MAL_SUCCEED, this_alias = NULL, falias = NULL;
MT_Id myID = MT_getpid();
MT_lock_set(&ttrLock);
@@ -1111,6 +1120,23 @@ CQderegister(Client cntxt, str alias)
if(myID != cq_pid) {
pnet[idx].status = CQSTOP;
this_alias = pnet[idx].alias;
+ if(IS_UNION(pnet[idx].func)) {
+ for( i=0; i < pnettop && !falias; i++){
+ if(i != idx) {
+ for( j=0; j< MAXSTREAMS &&
pnet[i].baskets[j] && !falias; j++){
+ if( strcmp("tmp",
baskets[pnet[i].baskets[j]].table->s->base.name) == 0 &&
+ strcmp(this_alias,
baskets[pnet[i].baskets[j]].table->base.name) == 0 ) {
+ falias = pnet[i].alias;
+ }
+ }
+ }
+ }
+ if(falias) {
+ msg = createException(SQL, "cquery.deregister",
+
SQLSTATE(42000) "The output stream of this continuous query is being used by
%s\n", falias);
+ goto unlock;
+ }
+ }
MT_lock_unset(&ttrLock);
// actually wait if the query was running
// the CQ might get removed during the sleep calls, so we have
to make this check
@@ -1198,7 +1224,7 @@ CQdump(void *ret)
if( pnet[i].inout[0])
fprintf(stderr, " --> ");
for (k = 0; k < MAXSTREAMS && pnet[i].baskets[k]; k++)
- if( pnet[i].inout[k] == STREAM_OUT)
+ if( pnet[i].inout[k] == STREAM_OUT || pnet[i].inout[k] ==
CQ_OUT)
fprintf(stderr, "%s.%s ",
baskets[pnet[i].baskets[k]].table->s->base.name,
baskets[pnet[i].baskets[k]].table->base.name);
if (pnet[i].error)
fprintf(stderr, " errors:%s", pnet[i].error);
@@ -1253,8 +1279,9 @@ CQexecute( Client cntxt, int idx)
static void
CQscheduler(void *dummy)
{
- int i, j, k = -1, pntasks, delay = cycleDelay;
- Client cntxt = (Client) dummy;
+ int i, j, k = -1, pntasks, delay = cycleDelay, start_trans;
+ Client c = (Client) dummy;
+ mvc* m;
str msg = MAL_SUCCEED;
lng t, now;
timestamp aux;
@@ -1265,6 +1292,13 @@ CQscheduler(void *dummy)
fprintf(stderr, "#cquery.scheduler started\n");
#endif
+ if ((msg = getSQLContext(c, NULL, &m, NULL)) != MAL_SUCCEED) {
+ fprintf(stderr, "CQscheduler internal error: %s\n", msg);
+ GDKfree(msg);
+ MT_lock_set(&ttrLock);
+ goto terminate;
+ }
+
MT_lock_set(&ttrLock);
pnstatus = CQRUNNING; // global state
@@ -1279,10 +1313,12 @@ CQscheduler(void *dummy)
if((msg = MTIMEcurrent_timestamp(&aux)) != MAL_SUCCEED) {
fprintf(stderr, "CQscheduler internal error: %s\n",
msg);
GDKfree(msg);
+ goto terminate;
}
if((msg = MTIMEepoch2lng(&now, &aux)) != MAL_SUCCEED) {
fprintf(stderr, "CQscheduler internal error: %s\n",
msg);
GDKfree(msg);
+ goto terminate;
}
pntasks=0;
@@ -1292,8 +1328,13 @@ CQscheduler(void *dummy)
/* Queries are triggered by the heartbeat or
all window constraints */
/* A heartbeat in combination with a window
constraint is ambiguous */
/* At least one constraint should be set */
- if( pnet[i].beats == HEARTBEAT_NIL &&
pnet[i].baskets[0] == 0)
+ if( pnet[i].beats == HEARTBEAT_NIL &&
pnet[i].baskets[0] == 0) {
pnet[i].enabled = 0;
+ pnet[i].status = CQERROR;
+ pnet[i].error = createException(SQL,
"cquery.scheduler", SQLSTATE(3F000)
+ "Neither a heartbeat or a stream window
condition exists, this CQ cannot be triggered\n");
+ CQentry(i);
+ }
if( pnet[i].enabled && ((pnet[i].beats !=
HEARTBEAT_NIL && pnet[i].beats > 0) || pnet[i].run > 0)) {
pnet[i].enabled = now >= pnet[i].run +
(pnet[i].beats > 0 ? pnet[i].beats : 0);
#ifdef DEBUG_CQUERY_SCHEDULER
@@ -1344,7 +1385,7 @@ CQscheduler(void *dummy)
/* Execute each enabled transformation */
/* Tricky part is here a single stream used by multiple
transitions */
- for (i = 0; i < pnettop ; i++)
+ for (i = 0; i < pnettop; i++)
if( pnet[i].enabled){
delay = cycleDelay;
#ifdef DEBUG_CQUERY
@@ -1353,48 +1394,42 @@ CQscheduler(void *dummy)
t = GDKusec();
pnet[i].status = CQRUNNING;
if( !GDKexiting())
- CQexecute(cntxt, i);
-/*
- if (MT_create_thread(&pnet[i].tid, CQexecute,
(void*) (pnet+i), MT_THR_JOINABLE) < 0){
- msg=
createException(MAL,"petrinet.scheduler","Can not fork the thread");
- } else
-*/
+ CQexecute(c, i);
if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0)
{
pnet[i].cycles--;
- if(pnet[i].cycles == 0) { //if it was the last
cycle of the CQ, remove it
- CQfree(cntxt, i);
- i--;
- continue; //an entry was deleted, so
jump over!
- }
+ if(pnet[i].cycles == 0) //if it was the last
cycle of the CQ, remove it
+ pnet[i].status = CQDELETE;
+ }
+ if(pnet[i].status != CQDELETE) {
+ pnet[i].run = now;
/* last executed */
+ pnet[i].time = GDKusec() - t; /* keep around
in microseconds */
+ (void) MTIMEcurrent_timestamp(&pnet[i].seen);
+ pnet[i].enabled = 0;
+ CQentry(i);
}
- pnet[i].run = now; /* last
executed */
- pnet[i].time = GDKusec() - t; /* keep around in
microseconds */
- (void) MTIMEcurrent_timestamp(&pnet[i].seen);
- pnet[i].enabled = 0;
- CQentry(i);
+ }
+ start_trans = 0;
+ for (i = 0; i < pnettop ; i++) { //if there is a continuous
function to delete, we must start a transaction
+ if( pnet[i].status == CQDELETE &&
IS_UNION(pnet[i].func))
+ start_trans = 1;
+ }
+ if(start_trans) {
+ SQLtrans(m);
+ if (!m->sa)
+ m->sa = sa_create();
+ if (!m->sa) {
+ fprintf(stderr, "CQscheduler sql allocation
failure\n");
+ goto terminate;
+ }
}
for(i = pnettop - 1; i >= 0; i--) { //more defensive way to
stop continuous queries from the scheduler itself
- if( pnet[i].status == CQDELETE){
- CQfree(cntxt, i);
- }
+ if( pnet[i].status == CQDELETE)
+ CQfree(c, i);
}
+ if(start_trans)
+ SQLautocommit(m);
if( pnettop == 0)
pnstatus = CQSTOP;
- /* after one sweep all threads should be released */
-/*
- for (m = 0; m < k; m++)
- if(pnet[enabled[m]].tid){
-#ifdef DEBUG_CQUERY
- fprintf(stderr, "#Terminate query thread %s limit %d
\n", pnet[enabled[m]].fcnname, pnet[enabled[m]].limit);
-#endif
- MT_join_thread(pnet[enabled[m]].tid);
- }
-
-#ifdef DEBUG_CQUERY
- if (pnstatus == CQRUNNING && cycleDelay)
MT_sleep_ms(cycleDelay);
-#endif
- MT_sleep_ms(CQDELAY);
-*/
/* we should actually delay until the next heartbeat or
insertion into the streams */
if ((pntasks == 0 && pnstatus != CQSTOP) || pnstatus ==
CQPAUSE) {
#ifdef DEBUG_CQUERY
@@ -1410,10 +1445,11 @@ CQscheduler(void *dummy)
#ifdef DEBUG_CQUERY
fprintf(stderr, "#cquery.scheduler stopped\n");
#endif
+terminate:
pnstatus = CQINIT;
cq_pid = 0;
- SQLexitClient(cntxt);
- MCcloseClient(cntxt, CQ_CLIENT);
+ SQLexitClient(c);
+ MCcloseClient(c, CQ_CLIENT);
MT_lock_unset(&ttrLock);
}
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -41,6 +41,7 @@
#define STREAM_IN 1
#define STREAM_OUT 2
+#define CQ_OUT 3 /* Output stream in continuous functions */
typedef struct {
sql_func *func; /* The UDF to be called */
@@ -58,7 +59,6 @@ typedef struct {
lng beats; /* heart beat stride for procedures activations
-> must be in microseconds */
lng run; /* start at the CQ at that precise moment (UNIX
timestamp) -> must be in microseconds */
- /*MT_Id tid; Thread responsible */
timestamp seen; /* last time the query was seen by the scheduler */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list