Changeset: 02531371b912 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=02531371b912
Modified Files:
        monetdb5/mal/mal_interpreter.mx
        sql/backends/monet5/datacell/basket.mx
        sql/backends/monet5/datacell/datacell.mx
        sql/backends/monet5/datacell/emitter.mx
        sql/backends/monet5/datacell/petrinet.mx
        sql/backends/monet5/datacell/receptor.mx
Branch: default
Log Message:

Using relational tables as baskets
A step towards SQL/datacell processing. Use the datacell schema
as the default for baskets, but also allow arbitrary tables to
be designated as such.


diffs (truncated from 1536 to 300 lines):

diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -1233,7 +1233,7 @@
        MT_lock_set(&flow->todo->l, "q_enqueue");
        for (i = 0; i < todo; i++)
                if (flow->status[i].blocks == 0) {
-                       p = getInstrPtr(fs[0].mb, i);
+                       p = getInstrPtr(fs[0].mb, flow->start + i );
                        for (j = p->retc; j < p->argc; j++)
                                flow->status[i].argclaim += 
getMemoryClaim(flow->status[0].mb, flow->status[0].stk, p, j, FALSE);
                        queued++;
diff --git a/sql/backends/monet5/datacell/basket.mx 
b/sql/backends/monet5/datacell/basket.mx
--- a/sql/backends/monet5/datacell/basket.mx
+++ b/sql/backends/monet5/datacell/basket.mx
@@ -17,125 +17,89 @@
 @f basket
 @- Event baskets
 Continuous query processing relies on event baskets 
-passed through a processing pipeline. Each stream is
-represented with a BAT whose header contains a usec tag.
-All event components received at the same receptor will
-be tagged with the same key. 
+passed through a processing pipeline. The baskets
+are derived from ordinary SQL tables where the delta
+processing is ignored.
 
-Query processing follows the following steps:
-1) gain access to a series of baskets,locking them for exclusive use
-2) take a snapshot of its content and release the locks
-3) process the query using the snapshots
-4) lock the baskets again
-5) drop elements from the basket you have seen/used.
-6) unlock the baskets
+@mal
+module datacell;
 
-The baskets form a simple synchronization point. They
-are controlled by a group based locking scheme.
-Steps 2) and 5) can be taken as a single action, i.e.
-remove the previous snapshot and collect the new ones.
+pattern register_basket(schema:str,tab:str):void
+address DCregister_basket
+comment "Initialize a new basket based on a specific table definition in the 
datacell schema");
 
-The baskets are organized into groups, i.e. they represent
-a relational table.
-@mal
-module basket;
+pattern register_basket(tab:str):void
+address DCregister_basket
+comment "Initialize a new basket based on a specific table definition in the 
datacell schema");
 
-pattern new(nme:str,b:bat[:any_1,:any_2]):bat[:any_1,:any_2]
-address BSKTnew
-comment "Create a new event basket";
+pattern register_basket():void
+address DCregister_all
+comment "Create basket definitions for all tables in the datacell schema";
 
-pattern group(nme:str,components:str...):void
-address BSKTgroup
-comment "Create a basket group";
+command inventory():bat[:str,:bat]
+address DCinventory
+comment "Produce a tabular view of the baskets identified";
 
-command drop(grp:str,nme:str):void
-address BSKTdrop
-comment "Remove a basket from a group";
-command drop(grp:str):void
-address BSKTdropGroup
-comment "Remove a basket group";
-
-pattern bind(grp:str,nme:str) :bat[:any_1,:any_2]
-address BSKTbind
-comment "Locate the designated stream and provide access
-to its base and private snapshot.";
-
-command delete(nme:str,piv:bat[:any_1,:any_3]):void
-address BSKTdelete
-comment "Remove the events identified by their tag from their basket";
-
-command update(nme:str,del:bat[:any_1,:any_3]):bat[:any_1,:any_3]
-address BSKTupdate
-comment "Remove the events identified from a basket and return a snapshot
-of the newly arrived events";
-
-command lock(grp:str,delay:int):void
-address BSKTlock
+command lock(schema:str, tbl:str,delay:int):void
+address DClock
 comment "Lock a basket group and spinlock upon need";
-command lock(grp:str):void
-address BSKTlock2
+command lock(schema:str, tbl:str):void
+address DClock2
 comment "Lock a basket group ";
 
-command unlock(grp:str):void
-address BSKTunlock
+command unlock(schema:str, tbl:str):void
+address DCunlock
 comment "Unlock the basket";
+
 @{
 @h
-#ifndef _BASKETS_
-#define _BASKETS_
-
-/* #define _DEBUG_BASKET     debug this module */
-#define BSKTout GDKout
+#ifndef _DATACELLS_
+#define _DATACELLS_
 
 #include "monetdb_config.h"
 #include "sql.h"
-#include "mal.h"
 #include "mal_interpreter.h"
 
-@-
-Each group has a single lock structure and possibly shared
-properties.
-@h
-typedef struct GROUPLOCK{
-       MT_Lock lock;
-} *GroupLock;
-
-typedef struct{
-       GroupLock lock;
-       str name;
-       str grp;
-       BAT *primary;
-} *Basket, BasketRec;
-
 
 #ifdef WIN32
 #ifndef LIBCONTAINERS
-#define containers_export extern __declspec(dllimport)
+#define datacell_export extern __declspec(dllimport)
 #else
-#define containers_export extern __declspec(dllexport)
+#define datacell_export extern __declspec(dllexport)
 #endif
 #else
-#define containers_export extern
+#define datacell_export extern
 #endif
 
-containers_export str BSKTlock(int *ret, str *nme, int *delay);
-containers_export str BSKTlock2(int *ret, str *grp);
-containers_export str BSKTunlock(int *ret, str *nme);
-containers_export str BSKTnew(Client cntxt, MalBlkPtr mk, MalStkPtr stk, 
InstrPtr pc);
-containers_export str BSKTbind(Client cntxt, MalBlkPtr mk, MalStkPtr stk, 
InstrPtr pc);
-containers_export str BSKTgroup(Client cntxt, MalBlkPtr mk, MalStkPtr stk, 
InstrPtr pc);
-containers_export str BSKTdrop(int *ret, str *grp, str *nme);
-containers_export str BSKTdropGroup(int *ret, str *grp);
-containers_export str BSKTdelete(int *ret, str *grp, str *nme, int *del);
-containers_export str BSKTfind(int *ret, str *grp, str *nme);
-containers_export int BSKTlocateGroup(str grp);
-containers_export int BSKTmembers(str grp, str *member, int limit);
-containers_export int BSKTmemberCount(str grp);
-containers_export str BSKTmemberName(str grp,int idx);
-containers_export str BSKTupdate(int *ret, int *bsk, int *upd);
-containers_export BasketRec *baskets;
-containers_export int bsktTop, bsktLimit;
-containers_export lng usec(void);
+/* #define _DEBUG_DATACELL     debug this module */
+#define DCout GDKout
+#define MAXCOL 128
+#define MAXBSK 64
+
+typedef struct{
+       MT_Lock lock;
+       str schema;     /* lives in the datacell schema by default */
+       str name;       /* table that represents the basket */
+       int colcount;
+       str *cols;
+       BAT **primary;  
+       /* statistics */
+} *DCbasket, DCbasketRec;
+
+datacell_export str schema_default;
+datacell_export str DCregister_all( Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+datacell_export str DCregister_basket( Client cntxt, MalBlkPtr mb, MalStkPtr 
stk, InstrPtr pci);
+datacell_export str DCinventory(int *ret);
+datacell_export int DCmemberCount(str schema, str tbl);
+datacell_export int DClocate(str schema, str tbl);
+
+datacell_export str DClock(int *ret, str *schema, str *tbl, int *delay);
+datacell_export str DCunlock(int *ret, str *schema, str *tbl);
+datacell_export str DClock2(int *ret, str *schema, str *tbl);
+
+datacell_export DCbasketRec *baskets;
+datacell_export int bsktTop, bsktLimit;
+datacell_export lng usec(void);
 #endif
 @c
 #include "basket.h"
@@ -143,12 +107,14 @@
 #include "winsock2.h"
 #endif
 
-BasketRec *baskets;
+str schema_default= "datacell";
+
+DCbasketRec *baskets;  /* the datacell catalog */
 int bsktTop, bsktLimit;
 static MT_Lock bsktLock;
 
-#define lockBasketCatalog() mal_set_lock(bsktLock,"basket");
-#define unlockBasketCatalog() mal_unset_lock(bsktLock,"basket");
+#define lockDCbasketCatalog() mal_set_lock(bsktLock,"basket");
+#define unlockDCbasketCatalog() mal_unset_lock(bsktLock,"basket");
 
 @-
 We have to obtain the precise wall-clock time
@@ -163,278 +129,194 @@
     return ((lng) tp.tv_sec ) * LL_CONSTANT(1000000) + (lng) tp.tv_usec;
 }
 
-
-static int BSKTnewEntry(){
+static int DCnewEntry(){
        if(bsktLimit == 0){
-               bsktLimit = 128;
-               baskets= (BasketRec *) GDKzalloc(bsktLimit * sizeof(BasketRec));
+               bsktLimit = MAXBSK;
+               baskets= (DCbasketRec *) GDKzalloc(bsktLimit * 
sizeof(DCbasketRec));
                MT_lock_init(&bsktLock,"basket");
                bsktTop=1; /* entry 0 is used as non-initialized */
        } else
                if( bsktTop == bsktLimit){
-                       bsktLimit +=128;
-                       baskets= (BasketRec *) GDKrealloc(baskets, bsktLimit * 
sizeof(BasketRec));
+                       bsktLimit +=MAXBSK;
+                       baskets= (DCbasketRec *) GDKrealloc(baskets, bsktLimit 
* sizeof(DCbasketRec));
                }
        return bsktTop++;
 }
 
-static int
-BSKTlocate(str grp, str nme){
+int
+DClocate(str schema, str tbl){
        int i;
        for(i=1; i< bsktTop; i++)
-       if(grp && nme &&
-               baskets[i].name && strcmp(nme, baskets[i].name)==0 &&
-               baskets[i].grp && strcmp(grp, baskets[i].grp)==0 )
+       if( tbl && baskets[i].name && strcmp(tbl, baskets[i].name)==0  &&
+               (schema == 0 || (baskets[i].schema && strcmp(schema, 
baskets[i].schema) ==0) ) )
                return i;
        return 0;
 }
 
-int
-BSKTlocateGroup(str grp){
-       int i;
-       for(i=1; i< bsktTop; i++)
-       if( grp && baskets[i].grp && strcmp(grp, baskets[i].grp)==0 )
-               return i;
-       return 0;
-}
+static str
+DCnewbasket(sql_schema *s, sql_table *t, sql_trans *tr)
+{
+       int idx, i;
+       node *o;
+       str msg= MAL_SUCCEED;;
+       BAT *b;
+       sql_column  *c;
 
-str BSKTfind(int *bid, str *grp, str *nme){
-       int i;
-       i= BSKTlocate(*grp,*nme);
-       if( i==0) 
-               throw(MAL,"basket.find","Could not find basket");
-       assert(i>0 && i<bsktTop);
-       if(baskets[i].primary == 0)
-               throw(MAL,"basket.find","Basket not initialized");
-       *bid= baskets[i].primary->batCacheid;
-       BBPincref(*bid,TRUE);
+       mal_set_lock(mal_contextLock, "register_basket");
+       idx= DCnewEntry();
+       MT_lock_init(&baskets[idx].lock,"register_basket");
+       baskets[idx].schema= GDKstrdup(s->base.name);
+       baskets[idx].name= GDKstrdup(t->base.name);
+
+       baskets[idx].colcount = 0;
+    for (o = t->columns.set->h; o; o = o->next)  baskets[idx].colcount++;
+       baskets[idx].cols= GDKzalloc((baskets[idx].colcount + 1) * sizeof 
(str));
+       baskets[idx].primary= GDKzalloc((baskets[idx].colcount + 1) * sizeof 
(BAT *));
+
+       i = 0;
+    for (o = t->columns.set->h; msg == MAL_SUCCEED && o; o = o->next) {
+               c = o->data;
+               b = store_funcs.bind_col(tr, c, 0);
+               if (b == NULL) {
+                       mal_unset_lock(mal_contextLock, "register_basket");
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to