Changeset: d473b263b78d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d473b263b78d
Added Files:
sql/backends/monet5/51_basket.mal
sql/backends/monet5/Tests/basket00.malC
sql/backends/monet5/sql_basket.c
sql/backends/monet5/sql_basket.h
Removed Files:
sql/backends/monet5/timetrails/basket.c
sql/backends/monet5/timetrails/basket.h
sql/backends/monet5/timetrails/basket.mal
Modified Files:
sql/backends/monet5/Makefile.ag
sql/backends/monet5/Tests/All
sql/backends/monet5/Tests/cquery10.sql
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
Branch: timetrails
Log Message:
First pass over the baskets
diffs (truncated from 1424 to 300 lines):
diff --git a/sql/backends/monet5/timetrails/basket.mal
b/sql/backends/monet5/51_basket.mal
rename from sql/backends/monet5/timetrails/basket.mal
rename to sql/backends/monet5/51_basket.mal
--- a/sql/backends/monet5/timetrails/basket.mal
+++ b/sql/backends/monet5/51_basket.mal
@@ -17,18 +17,13 @@
module basket;
-pattern register(mvc:int,sch:str,tbl:str, role:int):int
-address BSKTregister
-comment "Initialize a new basket based on a specific table definition.
-roles:input =0, output=2";
-
pattern tid(mvc:int,sch:str,tbl:str):bat[:oid]
address BSKTtid
comment "Collect the candidates for a basket";
pattern bind(mvc:int,sch:str,tbl:str,col:str):bat[:any]
address BSKTbind
-comment "Access the stream basket column";
+comment "Access the basket column, if needed add to basket catalog";
pattern append(mvc:int, sch:str, tbl:str, col:str, val:any):int
address BSKTappend
@@ -45,14 +40,22 @@ comment "Remove tuples from a basket";
pattern clear_table(sname:str,tname:str):lng
address mvc_clear_table_wrap;
+pattern setwindow(sch:str, tbl:str, elm:int):int
+address BSKTsetwindow
+comment "Set window size";
+
+pattern window(mvc:any, sch:str, tbl:str):int
+address BSKTwindow
+comment "Apply window selection";
+
+pattern settumble(sch:str, tbl:str, elm:int):int
+address BSKsettumble
+comment "Set stride";
+
pattern tumble(mvc:any, sch:str, tbl:str):int
address BSKTtumble
comment "Apply tumbling to the basket";
-pattern settumble(sch:str, tbl:str, n:int):void
-address BSKTsettumble
-comment "Make the basket available for the scheduler";
-
unsafe pattern lock(mvc:any, sch:str, tbl:str):int
address BSKTlock
comment "Lock the basket for private use";
@@ -69,10 +72,14 @@ pattern reset(mvc:int,sch:str,tbl:str):i
address BSKTreset
comment "Remove a basket content";
-command drop(mvc:int,sch:str,tbl:str):int
+pattern drop(mvc:int,sch:str,tbl:str):int
address BSKTdrop
comment "Remove the basket";
-command errors() (l:bat[:str],m:bat[:str])
-address BSKTtableerrors
-comment "Return the table with all errors";
+pattern status()
(seen:bat[:timestamp],sch:bat[:str],tbl:bat[:str],events:bat[:int],cycles:bat[:int],
error:bat[:str])
+address BSKTstatus
+comment "Show the status of the baskets";
+
+command dump()
+address BSKTdump
+comment "Show the baskets table";
diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag
--- a/sql/backends/monet5/Makefile.ag
+++ b/sql/backends/monet5/Makefile.ag
@@ -45,6 +45,7 @@ lib__sql = {
sql_round.c sql_round_impl.h sql_bat2time.c \
sql_fround.c sql_fround_impl.h \
sql_orderidx.c sql_orderidx.h \
+ sql_basket.c sql_basket.h \
sql_cquery.c sql_cquery.h \
sql_rank.c sql_rank.h
LIBS = ../../server/libsqlserver \
@@ -79,7 +80,7 @@ headers_mal_hge = {
headers_autoload = {
HEADERS = mal
DIR = libdir/monetdb5/autoload
- SOURCES = 40_sql.mal 50_cquery.mal
+ SOURCES = 40_sql.mal 51_basket.mal 50_cquery.mal
}
headers_autoload_hge = {
diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -77,6 +77,8 @@ shutdown
HAVE_HGE?int_notation_1e5
+basket00
+
cquery00
cquery05
cquery10
diff --git a/sql/backends/monet5/Tests/basket00.malC
b/sql/backends/monet5/Tests/basket00.malC
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/basket00.malC
@@ -0,0 +1,1 @@
+#simple test for basket management
diff --git a/sql/backends/monet5/Tests/cquery10.sql
b/sql/backends/monet5/Tests/cquery10.sql
--- a/sql/backends/monet5/Tests/cquery10.sql
+++ b/sql/backends/monet5/Tests/cquery10.sql
@@ -14,11 +14,11 @@ call cquery.register('sys','cq_cycles');
call cquery.heartbeat('sys','cq_cycles',1000);
-- The scheduler executes all CQ at most 5 rounds
-call cquery.cycles('sys','cq_cycles',5);
+call cquery.cycles('sys','cq_cycles',3);
-- reactivate all continuous queries
call cquery.resume();
-call cquery.wait(2000);
+call cquery.wait(4000);
call cquery.pause();
select 'RESULT';
diff --git a/sql/backends/monet5/timetrails/basket.c
b/sql/backends/monet5/sql_basket.c
rename from sql/backends/monet5/timetrails/basket.c
rename to sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/timetrails/basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -28,24 +28,34 @@
#include "monetdb_config.h"
#include <unistd.h>
#include "gdk.h"
-#include "timetrails.h"
-#include "basket.h"
-#include "petrinet.h"
+#include "sql_cquery.h"
+#include "sql_basket.h"
#include "mal_exception.h"
#include "mal_builder.h"
#include "opt_prelude.h"
-#ifdef _MSC_VER
-#define access(file, mode) _access(file, mode)
-#endif
-
#define _DEBUG_BASKET_ if(0)
str statusname[3] = { "<unknown>", "waiting", "filled" };
-BasketRec *baskets; /* the global timetrails catalog */
+static BasketRec *baskets; /* the global timetrails catalog */
static int bsktTop = 0, bsktLimit = 0;
+// locate the basket in the basket catalog
+static int
+BSKTlocate(str sch, str tbl)
+{
+ int i;
+
+ if( sch == 0 || tbl == 0)
+ return 0;
+ for (i = 1; i < bsktTop; i++)
+ if (baskets[i].schema && strcmp(sch, baskets[i].schema) == 0 &&
+ baskets[i].table && strcmp(tbl, baskets[i].table) == 0)
+ return i;
+ return 0;
+}
+
// Find an empty slot in the basket catalog
static int BSKTnewEntry(void)
{
@@ -55,17 +65,19 @@ static int BSKTnewEntry(void)
if (bsktLimit == 0) {
bsktLimit = MAXBSKT;
baskets = (BasketRec *) GDKzalloc(bsktLimit *
sizeof(BasketRec));
+ if( baskets == 0)
+ return 0;
bsktTop = 1; /* entry 0 is used as non-initialized */
} else if (bsktTop + 1 == bsktLimit) {
- bnew = (BasketRec *) GDKzalloc((bsktLimit+MAXBSKT) *
sizeof(BasketRec));
- memcpy((char*) bnew, (char*) baskets, bsktLimit *
sizeof(BasketRec));
+ bnew = (BasketRec *) GDKrealloc(baskets, (bsktLimit+MAXBSKT) *
sizeof(BasketRec));
+ if( bnew == 0)
+ return 0;
bsktLimit += MAXBSKT;
- GDKfree(baskets);
baskets = bnew;
}
for (i = 1; i < bsktLimit; i++) { /* find an available slot */
- if (baskets[i].table_name == NULL)
+ if (baskets[i].table == NULL)
break;
}
if(i >= bsktTop) { /* if it's the last one we need to increment bsktTop
*/
@@ -75,49 +87,29 @@ static int BSKTnewEntry(void)
return i;
}
-// free all malloced space
+// free a basket structure
void
BSKTclean(int idx)
-{
- if( idx){
- GDKfree(baskets[idx].schema_name);
- GDKfree(baskets[idx].table_name);
- baskets[idx].schema_name = NULL;
- baskets[idx].table_name = NULL;
-
- BBPreclaim(baskets[idx].errors);
- baskets[idx].errors = NULL;
- baskets[idx].winstride = -1;
- baskets[idx].count = 0;
- }
- for(idx = 1; idx < bsktTop; idx++){
- GDKfree(baskets[idx].schema_name);
- GDKfree(baskets[idx].table_name);
- baskets[idx].schema_name = NULL;
- baskets[idx].table_name = NULL;
+{ int i;
- BBPreclaim(baskets[idx].errors);
- baskets[idx].errors = NULL;
- baskets[idx].winstride = -1;
+ if( idx){
+ GDKfree(baskets[idx].schema);
+ GDKfree(baskets[idx].table);
+ GDKfree(baskets[idx].error);
+ baskets[idx].schema = NULL;
+ baskets[idx].table = NULL;
+ baskets[idx].error = NULL;
baskets[idx].count = 0;
+ baskets[idx].events = 0;
+ baskets[idx].cycles = 0;
+ baskets[idx].seen = *timestamp_nil;
+ for(i=0; baskets[idx].bats[i]; i++){
+ BBPunfix(baskets[idx].bats[i]->batCacheid);
+ baskets[idx].bats[i] =0;
+ }
}
}
-// locate the basket in the catalog
-int
-BSKTlocate(str sch, str tbl)
-{
- int i;
-
- if( sch == 0 || tbl == 0)
- return 0;
- for (i = 1; i < bsktTop; i++)
- if (baskets[i].schema_name && strcmp(sch,
baskets[i].schema_name) == 0 &&
- baskets[i].table_name && strcmp(tbl,
baskets[i].table_name) == 0)
- return i;
- return 0;
-}
-
// Instantiate a basket description for a particular stream table
static str
BSKTnewbasket(mvc *m, sql_schema *s, sql_table *t)
@@ -129,38 +121,34 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
// Don't introduce the same basket twice
if( BSKTlocate(s->base.name, t->base.name) > 0)
return MAL_SUCCEED;
+
if( !isStream(t))
throw(MAL,"basket.register","Only allowed for stream tables");
MT_lock_set(&ttrLock);
idx = BSKTnewEntry();
- baskets[idx].schema_name = GDKstrdup(s->base.name);
- baskets[idx].table_name = GDKstrdup(t->base.name);
+ baskets[idx].schema = GDKstrdup(s->base.name);
+ baskets[idx].table = GDKstrdup(t->base.name);
(void) MTIMEcurrent_timestamp(&baskets[idx].seen);
- baskets[idx].count = 0;
-
- baskets[idx].winstride = -1; /* all tuples are removed */
-
// Check the column types first
- for (o = t->columns.set->h; o && colcnt <MAXCOLS; o = o->next){
+ for (o = t->columns.set->h; o && colcnt <MAXCOLS-1; o = o->next){
sql_column *col = o->data;
int tpe = col->type.type->localtype;
if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) ){
MT_lock_unset(&ttrLock);
- throw(MAL,"baskets.register","Unsupported type
%d\n",tpe);
+ throw(MAL,"basket.register","Unsupported type
%d\n",tpe);
}
colcnt++;
}
- if( colcnt == MAXCOLS){
+ if( colcnt == MAXCOLS-1){
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list