Changeset: 02531371b912 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=02531371b912
Modified Files:
monetdb5/mal/mal_interpreter.mx
sql/backends/monet5/datacell/basket.mx
sql/backends/monet5/datacell/datacell.mx
sql/backends/monet5/datacell/emitter.mx
sql/backends/monet5/datacell/petrinet.mx
sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:
Using relational tables as baskets
A step towards SQL/datacell processing. Use the datacell schema
as the default for baskets, but also allow arbitrary tables to
be designated as such.
diffs (truncated from 1536 to 300 lines):
diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -1233,7 +1233,7 @@
MT_lock_set(&flow->todo->l, "q_enqueue");
for (i = 0; i < todo; i++)
if (flow->status[i].blocks == 0) {
- p = getInstrPtr(fs[0].mb, i);
+ p = getInstrPtr(fs[0].mb, flow->start + i );
for (j = p->retc; j < p->argc; j++)
flow->status[i].argclaim +=
getMemoryClaim(flow->status[0].mb, flow->status[0].stk, p, j, FALSE);
queued++;
diff --git a/sql/backends/monet5/datacell/basket.mx
b/sql/backends/monet5/datacell/basket.mx
--- a/sql/backends/monet5/datacell/basket.mx
+++ b/sql/backends/monet5/datacell/basket.mx
@@ -17,125 +17,89 @@
@f basket
@- Event baskets
Continuous query processing relies on event baskets
-passed through a processing pipeline. Each stream is
-represented with a BAT whose header contains a usec tag.
-All event components received at the same receptor will
-be tagged with the same key.
+passed through a processing pipeline. The baskets
+are derived from ordinary SQL tables where the delta
+processing is ignored.
-Query processing follows the following steps:
-1) gain access to a series of baskets,locking them for exclusive use
-2) take a snapshot of its content and release the locks
-3) process the query using the snapshots
-4) lock the baskets again
-5) drop elements from the basket you have seen/used.
-6) unlock the baskets
+@mal
+module datacell;
-The baskets form a simple synchronization point. They
-are controlled by a group based locking scheme.
-Steps 2) and 5) can be taken as a single action, i.e.
-remove the previous snapshot and collect the new ones.
+pattern register_basket(schema:str,tab:str):void
+address DCregister_basket
+comment "Initialize a new basket based on a specific table definition in the
datacell schema");
-The baskets are organized into groups, i.e. they represent
-a relational table.
-@mal
-module basket;
+pattern register_basket(tab:str):void
+address DCregister_basket
+comment "Initialize a new basket based on a specific table definition in the
datacell schema");
-pattern new(nme:str,b:bat[:any_1,:any_2]):bat[:any_1,:any_2]
-address BSKTnew
-comment "Create a new event basket";
+pattern register_basket():void
+address DCregister_all
+comment "Create basket definitions for all tables in the datacell schema";
-pattern group(nme:str,components:str...):void
-address BSKTgroup
-comment "Create a basket group";
+command inventory():bat[:str,:bat]
+address DCinventory
+comment "Produce a tabular view of the baskets identified";
-command drop(grp:str,nme:str):void
-address BSKTdrop
-comment "Remove a basket from a group";
-command drop(grp:str):void
-address BSKTdropGroup
-comment "Remove a basket group";
-
-pattern bind(grp:str,nme:str) :bat[:any_1,:any_2]
-address BSKTbind
-comment "Locate the designated stream and provide access
-to its base and private snapshot.";
-
-command delete(nme:str,piv:bat[:any_1,:any_3]):void
-address BSKTdelete
-comment "Remove the events identified by their tag from their basket";
-
-command update(nme:str,del:bat[:any_1,:any_3]):bat[:any_1,:any_3]
-address BSKTupdate
-comment "Remove the events identified from a basket and return a snapshot
-of the newly arrived events";
-
-command lock(grp:str,delay:int):void
-address BSKTlock
+command lock(schema:str, tbl:str,delay:int):void
+address DClock
comment "Lock a basket group and spinlock upon need";
-command lock(grp:str):void
-address BSKTlock2
+command lock(schema:str, tbl:str):void
+address DClock2
comment "Lock a basket group ";
-command unlock(grp:str):void
-address BSKTunlock
+command unlock(schema:str, tbl:str):void
+address DCunlock
comment "Unlock the basket";
+
@{
@h
-#ifndef _BASKETS_
-#define _BASKETS_
-
-/* #define _DEBUG_BASKET debug this module */
-#define BSKTout GDKout
+#ifndef _DATACELLS_
+#define _DATACELLS_
#include "monetdb_config.h"
#include "sql.h"
-#include "mal.h"
#include "mal_interpreter.h"
-@-
-Each group has a single lock structure and possibly shared
-properties.
-@h
-typedef struct GROUPLOCK{
- MT_Lock lock;
-} *GroupLock;
-
-typedef struct{
- GroupLock lock;
- str name;
- str grp;
- BAT *primary;
-} *Basket, BasketRec;
-
#ifdef WIN32
#ifndef LIBCONTAINERS
-#define containers_export extern __declspec(dllimport)
+#define datacell_export extern __declspec(dllimport)
#else
-#define containers_export extern __declspec(dllexport)
+#define datacell_export extern __declspec(dllexport)
#endif
#else
-#define containers_export extern
+#define datacell_export extern
#endif
-containers_export str BSKTlock(int *ret, str *nme, int *delay);
-containers_export str BSKTlock2(int *ret, str *grp);
-containers_export str BSKTunlock(int *ret, str *nme);
-containers_export str BSKTnew(Client cntxt, MalBlkPtr mk, MalStkPtr stk,
InstrPtr pc);
-containers_export str BSKTbind(Client cntxt, MalBlkPtr mk, MalStkPtr stk,
InstrPtr pc);
-containers_export str BSKTgroup(Client cntxt, MalBlkPtr mk, MalStkPtr stk,
InstrPtr pc);
-containers_export str BSKTdrop(int *ret, str *grp, str *nme);
-containers_export str BSKTdropGroup(int *ret, str *grp);
-containers_export str BSKTdelete(int *ret, str *grp, str *nme, int *del);
-containers_export str BSKTfind(int *ret, str *grp, str *nme);
-containers_export int BSKTlocateGroup(str grp);
-containers_export int BSKTmembers(str grp, str *member, int limit);
-containers_export int BSKTmemberCount(str grp);
-containers_export str BSKTmemberName(str grp,int idx);
-containers_export str BSKTupdate(int *ret, int *bsk, int *upd);
-containers_export BasketRec *baskets;
-containers_export int bsktTop, bsktLimit;
-containers_export lng usec(void);
+/* #define _DEBUG_DATACELL debug this module */
+#define DCout GDKout
+#define MAXCOL 128
+#define MAXBSK 64
+
+typedef struct{
+ MT_Lock lock;
+ str schema; /* lives in the datacell schema by default */
+ str name; /* table that represents the basket */
+ int colcount;
+ str *cols;
+ BAT **primary;
+ /* statistics */
+} *DCbasket, DCbasketRec;
+
+datacell_export str schema_default;
+datacell_export str DCregister_all( Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+datacell_export str DCregister_basket( Client cntxt, MalBlkPtr mb, MalStkPtr
stk, InstrPtr pci);
+datacell_export str DCinventory(int *ret);
+datacell_export int DCmemberCount(str schema, str tbl);
+datacell_export int DClocate(str schema, str tbl);
+
+datacell_export str DClock(int *ret, str *schema, str *tbl, int *delay);
+datacell_export str DCunlock(int *ret, str *schema, str *tbl);
+datacell_export str DClock2(int *ret, str *schema, str *tbl);
+
+datacell_export DCbasketRec *baskets;
+datacell_export int bsktTop, bsktLimit;
+datacell_export lng usec(void);
#endif
@c
#include "basket.h"
@@ -143,12 +107,14 @@
#include "winsock2.h"
#endif
-BasketRec *baskets;
+str schema_default= "datacell";
+
+DCbasketRec *baskets; /* the datacell catalog */
int bsktTop, bsktLimit;
static MT_Lock bsktLock;
-#define lockBasketCatalog() mal_set_lock(bsktLock,"basket");
-#define unlockBasketCatalog() mal_unset_lock(bsktLock,"basket");
+#define lockDCbasketCatalog() mal_set_lock(bsktLock,"basket");
+#define unlockDCbasketCatalog() mal_unset_lock(bsktLock,"basket");
@-
We have to obtain the precise wall-clock time
@@ -163,278 +129,194 @@
return ((lng) tp.tv_sec ) * LL_CONSTANT(1000000) + (lng) tp.tv_usec;
}
-
-static int BSKTnewEntry(){
+static int DCnewEntry(){
if(bsktLimit == 0){
- bsktLimit = 128;
- baskets= (BasketRec *) GDKzalloc(bsktLimit * sizeof(BasketRec));
+ bsktLimit = MAXBSK;
+ baskets= (DCbasketRec *) GDKzalloc(bsktLimit *
sizeof(DCbasketRec));
MT_lock_init(&bsktLock,"basket");
bsktTop=1; /* entry 0 is used as non-initialized */
} else
if( bsktTop == bsktLimit){
- bsktLimit +=128;
- baskets= (BasketRec *) GDKrealloc(baskets, bsktLimit *
sizeof(BasketRec));
+ bsktLimit +=MAXBSK;
+ baskets= (DCbasketRec *) GDKrealloc(baskets, bsktLimit
* sizeof(DCbasketRec));
}
return bsktTop++;
}
-static int
-BSKTlocate(str grp, str nme){
+int
+DClocate(str schema, str tbl){
int i;
for(i=1; i< bsktTop; i++)
- if(grp && nme &&
- baskets[i].name && strcmp(nme, baskets[i].name)==0 &&
- baskets[i].grp && strcmp(grp, baskets[i].grp)==0 )
+ if( tbl && baskets[i].name && strcmp(tbl, baskets[i].name)==0 &&
+ (schema == 0 || (baskets[i].schema && strcmp(schema,
baskets[i].schema) ==0) ) )
return i;
return 0;
}
-int
-BSKTlocateGroup(str grp){
- int i;
- for(i=1; i< bsktTop; i++)
- if( grp && baskets[i].grp && strcmp(grp, baskets[i].grp)==0 )
- return i;
- return 0;
-}
+static str
+DCnewbasket(sql_schema *s, sql_table *t, sql_trans *tr)
+{
+ int idx, i;
+ node *o;
+ str msg= MAL_SUCCEED;;
+ BAT *b;
+ sql_column *c;
-str BSKTfind(int *bid, str *grp, str *nme){
- int i;
- i= BSKTlocate(*grp,*nme);
- if( i==0)
- throw(MAL,"basket.find","Could not find basket");
- assert(i>0 && i<bsktTop);
- if(baskets[i].primary == 0)
- throw(MAL,"basket.find","Basket not initialized");
- *bid= baskets[i].primary->batCacheid;
- BBPincref(*bid,TRUE);
+ mal_set_lock(mal_contextLock, "register_basket");
+ idx= DCnewEntry();
+ MT_lock_init(&baskets[idx].lock,"register_basket");
+ baskets[idx].schema= GDKstrdup(s->base.name);
+ baskets[idx].name= GDKstrdup(t->base.name);
+
+ baskets[idx].colcount = 0;
+ for (o = t->columns.set->h; o; o = o->next) baskets[idx].colcount++;
+ baskets[idx].cols= GDKzalloc((baskets[idx].colcount + 1) * sizeof
(str));
+ baskets[idx].primary= GDKzalloc((baskets[idx].colcount + 1) * sizeof
(BAT *));
+
+ i = 0;
+ for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) {
+ c = o->data;
+ b = store_funcs.bind_col(tr, c, 0);
+ if (b == NULL) {
+ mal_unset_lock(mal_contextLock, "register_basket");
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list