Changeset: adc82a28ea03 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=adc82a28ea03
Added Files:
sql/backends/monet5/iot/Tests/webtest.sql
Modified Files:
monetdb5/optimizer/opt_iot.c
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/Tests/All
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
Branch: iot
Log Message:
Process a iot basket
diffs (truncated from 321 to 300 lines):
diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -90,6 +90,16 @@ OPTiotImplementation(Client cntxt, MalBl
if( j == btop)
btop++;
}
+ if( getModuleId(p)== iotRef && getFunctionId(p) == basketRef){
+ OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream
table %s.%s\n", getModuleId(p), getFunctionId(p));
+ schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
+ tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+ for( j =0; j< btop ; j++)
+ if( strcmp(schemas[j], schemas[j+1])==0 &&
strcmp(tables[j],tables[j+1]) ==0)
+ break;
+ if( j == btop)
+ btop++;
+ }
}
if( btop == MAXBSKT || btop == 0)
return 0;
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -41,8 +41,8 @@ create procedure iot.cycles(n integer)
external name iot.cycles;
-- deliver a new basket with tuples
-create procedure iot.push("schema" string, "table" string, dirpath string)
- external name iot.push;
+create procedure iot.basket("schema" string, "table" string, dirpath string)
+ external name iot.basket;
-- Inspection tables
create function iot.baskets()
diff --git a/sql/backends/monet5/iot/Tests/All
b/sql/backends/monet5/iot/Tests/All
--- a/sql/backends/monet5/iot/Tests/All
+++ b/sql/backends/monet5/iot/Tests/All
@@ -1,4 +1,5 @@
-iot00.sql
-iot05.sql
-iot99.sql
-petrinet00.mal
+iot00
+iot05
+iot99
+petrinet00
+webtest
diff --git a/sql/backends/monet5/iot/Tests/webtest.sql
b/sql/backends/monet5/iot/Tests/webtest.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/iot/Tests/webtest.sql
@@ -0,0 +1,11 @@
+set schema iot;
+create stream table temps( iotclk timestamp, room string , temperature real);
+-- remainder depends on location of the baskets root
+
+declare baskets string;
+set baskets= '/ufs/mk/baskets/measures/temperatures/';
+
+call iot.basket('iot','temps', concat(baskets,'1'));
+select * from temps;
+call iot.basket('iot','temps', concat(baskets,'1'));
+select * from temps;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -113,7 +113,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
sql_column *col = o->data;
int tpe = col->type.type->localtype;
- if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) ){
+ if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime ||
tpe == TYPE_timestamp) ){
MT_lock_unset(&iotLock);
throw(MAL,"baskets.register","Unsupported type %d",tpe);
}
@@ -142,24 +142,16 @@ BSKTnewbasket(sql_schema *s, sql_table *
}
// MAL/SQL interface for registration of a single table
-str
-BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+static str
+BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl)
{
sql_schema *s;
sql_table *t;
mvc *m = NULL;
str msg = getSQLContext(cntxt, mb, &m, NULL);
- str sch, tbl;
if ( msg != MAL_SUCCEED)
return msg;
- if( stk == 0){
- sch = getVarConstant(mb, getArg(pci,1)).val.sval;
- tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
- } else{
- sch = *getArgReference_str(stk, pci, 1);
- tbl = *getArgReference_str(stk, pci, 2);
- }
/* check double registration */
if( BSKTlocate(sch, tbl) > 0)
@@ -180,6 +172,21 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
}
str
+BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch, tbl;
+
+ if( stk == 0){
+ sch = getVarConstant(mb, getArg(pci,1)).val.sval;
+ tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
+ } else{
+ sch = *getArgReference_str(stk, pci, 1);
+ tbl = *getArgReference_str(stk, pci, 2);
+ }
+ return BSKTregisterInternal(cntxt,mb,sch,tbl);
+}
+
+str
BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch, tbl;
@@ -314,57 +321,147 @@ BSKTreset(void *ret)
}
/* collect the binary files and append them to what we have */
+#define MAXLINE 4096
str
-BSKTpush(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch = *getArgReference_str(stk, pci, 1);
str tbl = *getArgReference_str(stk, pci, 2);
str dir = *getArgReference_str(stk, pci, 3);
int bskt;
- char buf[BUFSIZ];
+ char buf[PATHLENGTH];
node *n;
mvc *m = NULL;
BAT *b;
- int first=1;
- BUN cnt =0;
- str msg;
+ int first=1,i;
+ BUN cnt =0, bcnt=0;
+ str msg= MAL_SUCCEED;
+ FILE *f;
+ long fsize;
+ char line[MAXLINE];
msg= getSQLContext(cntxt,NULL, &m, NULL);
if( msg != MAL_SUCCEED)
return msg;
+ BSKTregisterInternal(cntxt, mb, sch, tbl);
bskt = BSKTlocate(sch,tbl);
if (bskt == 0)
- throw(SQL, "iot.push", "Could not find the basket
%s.%s",sch,tbl);
+ throw(SQL, "iot.basket", "Could not find the basket
%s.%s",sch,tbl);
// check access permission to directory first
if( access (dir , F_OK | R_OK)){
- throw(SQL, "iot.push", "Could not access the basket directory
%s. error %d",dir,errno);
+ throw(SQL, "iot.basket", "Could not access the basket directory
%s. error %d",dir,errno);
}
+ /* check for missing files */
+ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+ sql_column *c = n->data;
+ snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+ _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+ if( access (buf,R_OK))
+ throw(MAL,"iot.basket","Could not access the column %s
file %s\n",c->base.name, buf);
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ if( b == 0)
+ throw(MAL,"iot.basket","Could not access the column
%s\n",c->base.name);
+ }
+
// types are already checked during stream initialization
MT_lock_set(&iotLock);
+ for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n;
n= n->next){
+ sql_column *c = n->data;
+ snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+ _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+ f= fopen(buf,"r");
+ if( f == NULL){
+ msg= createException(MAL,"iot.basket","Could not access
the column %s file %s\n",c->base.name, buf);
+ break;
+ }
+ (void) fseek(f,0, SEEK_END);
+ fsize = ftell(f);
+ rewind(f);
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ assert( b);
+ bcnt = BATcount(b);
+
+ switch(ATOMstorage(b->ttype)){
+ case TYPE_bit:
+ case TYPE_bte:
+ case TYPE_sht:
+ case TYPE_int:
+ case TYPE_void:
+ case TYPE_oid:
+ case TYPE_flt:
+ case TYPE_dbl:
+ case TYPE_wrd:
+ case TYPE_lng:
+#ifdef HAVE_HGE
+ case TYPE_hge:
+#endif
+ if( BATextend(b, bcnt + fsize / ATOMsize(b->ttype)) !=
GDK_SUCCEED){
+ BBPunfix(b->batCacheid);
+ (void) fclose(f);
+ msg= createException(MAL,"iot.basket","Could
not extend basket %s\n",c->base.name);
+ goto recover;
+ }
+ /* append the binary partition */
+ if( fread(Tloc(b, BUNlast(b)),1,fsize, f) != (size_t)
fsize){
+ BBPunfix(b->batCacheid);
+ (void) fclose(f);
+ msg= createException(MAL,"iot.basket","Could
not read complete basket file %s\n",c->base.name);
+ goto recover;
+ }
+ BATsetcount(b, bcnt + fsize/ ATOMsize(b->ttype));
+ break;
+ case TYPE_str:
+ while (fgets(line, MAXLINE, f) != 0){
+ if ( line[i= strlen(line)-1] != '\n')
+ msg=
createException(MAL,"iot.basket","string too long\n");
+ else{
+ line[i] = 0;
+ BUNappend(b, line, TRUE);
+ bcnt++;
+ }
+ }
+ BATsetcount(b, bcnt );
+ }
+ (void) fclose(f);
+ }
+
+ /* check for mis-aligned columns */
+ for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n;
n= n->next){
+ sql_column *c = n->data;
+ b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+ assert( b );
+ if( first){
+ first = 0;
+ cnt = BATcount(b);
+ } else
+ if( cnt != BATcount(b))
+ msg= createException(MAL,"iot.basket","Columns
mis-aligned %s\n",c->base.name);
+ BBPunfix(b->batCacheid);
+ }
+ /* remove the basket files */
for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
sql_column *c = n->data;
- snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name);
- _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
- //BATattach(c->type.type->localtype,buf,PERSISTENT);
+ snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+ assert( access (buf,R_OK) == 0);
+ //unlink(buf);
+ }
+
+recover:
+ /* reset all BATs when they are misaligned or error occurred */
+ if( msg != MAL_SUCCEED)
+ for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+ sql_column *c = n->data;
b = store_funcs.bind_col(m->session->tr,c,RDONLY);
- if( b){
- baskets[bskt].count = BATcount(b);
- BBPunfix(b->batCacheid);
- if( first){
- cnt = BATcount(b);
- first = 0;
- } else
- if( cnt != BATcount(b)){
- MT_lock_unset(&iotLock);
- throw(MAL,"iot.push","Non-aligned
binary input files");
- }
- }
+ assert( b );
+ BATsetcount(b,0);
+ BBPunfix(b->batCacheid);
}
+
MT_lock_unset(&iotLock);
(void) mb;
- return MAL_SUCCEED;
+ return msg;
}
str
BSKTdump(void *ret)
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -92,7 +92,7 @@ iot_export str BSKTappend(Client cntxt,
iot_export str BSKTdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list