Changeset: ed10194a1320 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ed10194a1320
Modified Files:
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:
Add window strides
First step towards slowly moving windows based on
row count and time
diffs (129 lines):
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -26,7 +26,7 @@
*/
#include "monetdb_config.h"
-#include <gdk.h>
+#include "gdk.h"
#include "iot.h"
#include "basket.h"
#include "mal_exception.h"
@@ -36,7 +36,7 @@
//#define _DEBUG_BASKET_ if(0)
#define _DEBUG_BASKET_
-str statusname[4] = { "<unknown>", "running", "paused", "locked" };
+str statusname[4] = { "<unknown>", "active", "paused", "locked" };
BasketRec *baskets; /* the global iot catalog */
static int bsktTop = 0, bsktLimit = 0;
@@ -531,12 +531,21 @@ recover:
}
/* remove tuples from a basket according to the sliding policy */
+#define ColumnShift(B,TPE, STRIDE) { \
+ TPE *first= (TPE*) Tloc(B, BUNfirst(B));\
+ TPE *n = first+STRIDE;\
+ TPE *last= (TPE*) Tloc(B, BUNlast(B));\
+ for( ; n < last; n++, first++)\
+ *first=*n;\
+}
+
str
BSKTfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch = *getArgReference_str(stk, pci, 1);
str tbl = *getArgReference_str(stk, pci, 2);
BAT *b;
+ BUN cnt, stride;
node *n;
mvc *m = NULL;
str msg;
@@ -547,12 +556,71 @@ BSKTfinish(Client cntxt, MalBlkPtr mb, M
(void) pci;
msg= getSQLContext(cntxt,NULL, &m, NULL);
+ if( msg )
+ throw(SQL,"iot.finish","Missing SQL context");
bskt = BSKTlocate(sch,tbl);
if (bskt == 0)
throw(SQL, "iot.finish", "Could not find the basket
%s.%s",sch,tbl);
- if( msg ==MAL_SUCCEED)
- /* reset all stream BATs to empty*/
+ /* window stride option */
+ if( baskets[bskt].winsize && (stride =baskets[bskt].winstride)){
+shiftcolumns:
+ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+ sql_column *c = n->data;
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ assert( b );
+ cnt=BATcount(b);
+ if( cnt < stride)
+ break;
+
+ switch(ATOMstorage(b->ttype)){
+ case TYPE_bit:ColumnShift(b,bit,stride); break;
+ case TYPE_bte:ColumnShift(b,bte,stride); break;
+ case TYPE_sht:ColumnShift(b,sht,stride); break;
+ case TYPE_int:ColumnShift(b,int,stride); break;
+ case TYPE_oid:ColumnShift(b,oid,stride); break;
+ case TYPE_flt:ColumnShift(b,flt,stride); break;
+ case TYPE_dbl:ColumnShift(b,dbl,stride); break;
+ case TYPE_lng:ColumnShift(b,lng,stride); break;
+#ifdef HAVE_HGE
+ case TYPE_hge:ColumnShift(b,hge,stride); break;
+#endif
+ case TYPE_str:
+ switch(b->T->width){
+ case 1: ColumnShift(b,bte,stride); break;
+ case 2: ColumnShift(b,sht,stride); break;
+ case 4: ColumnShift(b,int,stride); break;
+ case 8: ColumnShift(b,lng,stride); break;
+ }
+ break;
+ default: break;
+ }
+ BATsetcount(b, BATcount(b)-stride);
+ BBPunfix(b->batCacheid);
+ }
+ return MAL_SUCCEED;
+ }
+
+ /* time stride option, prepare a new stride based on the leading
'iotclk' */
+ if( baskets[bskt].timeslice && baskets[bskt].timestride){
+ sql_column *c;
+ lng *first, *last, stop;
+ n = baskets[bskt].table->columns.set->h;
+ c = n->data;
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ assert( b );
+ if( b->ttype !=TYPE_timestamp)
+ throw(SQL, "iot.finish", "Could not find the leading
'iotclk' in %s.%s",sch,tbl);
+ first= (lng*) Tloc(b, BUNfirst(b));
+ last = (lng*) Tloc(b, BUNlast(b));
+ stride =0;
+ stop = *first + baskets[bskt].timestride;
+ for( ; first < last; first++)
+ if (*first >stop) break;
+ goto shiftcolumns;
+ }
+
+ /* default action: reset all stream BATs to empty*/
for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
sql_column *c = n->data;
b = store_funcs.bind_col(m->session->tr,c,RDONLY);
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -48,7 +48,7 @@
#define MAXPN 200 /* it is the minimum, if we need more space
GDKrealloc */
-static str statusname[6] = { "<unknown>", "running", "paused"};
+static str statusname[6] = { "<unknown>", "active", "paused"};
static void
PNstartScheduler(void);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list