Changeset: adc82a28ea03 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=adc82a28ea03
Added Files:
        sql/backends/monet5/iot/Tests/webtest.sql
Modified Files:
        monetdb5/optimizer/opt_iot.c
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/Tests/All
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
Branch: iot
Log Message:

Process a iot basket


diffs (truncated from 321 to 300 lines):

diff --git a/monetdb5/optimizer/opt_iot.c b/monetdb5/optimizer/opt_iot.c
--- a/monetdb5/optimizer/opt_iot.c
+++ b/monetdb5/optimizer/opt_iot.c
@@ -90,6 +90,16 @@ OPTiotImplementation(Client cntxt, MalBl
                        if( j == btop)
                                btop++;
                }
+               if( getModuleId(p)== iotRef && getFunctionId(p) == basketRef){
+                       OPTDEBUGiot mnstr_printf(cntxt->fdout, "#iot stream 
table %s.%s\n", getModuleId(p), getFunctionId(p));
+                       schemas[btop]= getVarConstant(mb, getArg(p,1)).val.sval;
+                       tables[btop]= getVarConstant(mb, getArg(p,2)).val.sval;
+                       for( j =0; j< btop ; j++)
+                       if( strcmp(schemas[j], schemas[j+1])==0  && 
strcmp(tables[j],tables[j+1]) ==0)
+                               break;
+                       if( j == btop)
+                               btop++;
+               }
        }
        if( btop == MAXBSKT || btop == 0)
                return 0;
diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -41,8 +41,8 @@ create procedure iot.cycles(n integer)
        external name iot.cycles;
 
 -- deliver a new basket with tuples
-create procedure iot.push("schema" string, "table" string, dirpath string)
-       external name iot.push;
+create procedure iot.basket("schema" string, "table" string, dirpath string)
+       external name iot.basket;
 
 -- Inspection tables
 create function iot.baskets()
diff --git a/sql/backends/monet5/iot/Tests/All 
b/sql/backends/monet5/iot/Tests/All
--- a/sql/backends/monet5/iot/Tests/All
+++ b/sql/backends/monet5/iot/Tests/All
@@ -1,4 +1,5 @@
-iot00.sql
-iot05.sql
-iot99.sql
-petrinet00.mal
+iot00
+iot05
+iot99
+petrinet00
+webtest
diff --git a/sql/backends/monet5/iot/Tests/webtest.sql 
b/sql/backends/monet5/iot/Tests/webtest.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/iot/Tests/webtest.sql
@@ -0,0 +1,11 @@
+set schema iot;
+create stream table temps( iotclk timestamp, room string , temperature real);
+-- remainder depends on location of the baskets root
+
+declare baskets string;
+set baskets= '/ufs/mk/baskets/measures/temperatures/';
+
+call iot.basket('iot','temps', concat(baskets,'1'));
+select * from temps;
+call iot.basket('iot','temps', concat(baskets,'1'));
+select * from temps;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -113,7 +113,7 @@ BSKTnewbasket(sql_schema *s, sql_table *
         sql_column *col = o->data;
         int tpe = col->type.type->localtype;
 
-        if ( !(tpe < TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) ){
+        if ( !(tpe <= TYPE_str || tpe == TYPE_date || tpe == TYPE_daytime || 
tpe == TYPE_timestamp) ){
                        MT_lock_unset(&iotLock);
                        throw(MAL,"baskets.register","Unsupported type %d",tpe);
                }
@@ -142,24 +142,16 @@ BSKTnewbasket(sql_schema *s, sql_table *
 }
 
 // MAL/SQL interface for registration of a single table
-str
-BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+static str
+BSKTregisterInternal(Client cntxt, MalBlkPtr mb, str sch, str tbl)
 {
        sql_schema  *s;
        sql_table   *t;
        mvc *m = NULL;
        str msg = getSQLContext(cntxt, mb, &m, NULL);
-       str sch, tbl;
 
        if ( msg != MAL_SUCCEED)
                return msg;
-       if( stk == 0){
-               sch = getVarConstant(mb, getArg(pci,1)).val.sval;
-               tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
-       } else{
-               sch = *getArgReference_str(stk, pci, 1);
-               tbl = *getArgReference_str(stk, pci, 2);
-       }
 
        /* check double registration */
        if( BSKTlocate(sch, tbl) > 0)
@@ -180,6 +172,21 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
 }
 
 str
+BSKTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch, tbl;
+
+       if( stk == 0){
+               sch = getVarConstant(mb, getArg(pci,1)).val.sval;
+               tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
+       } else{
+               sch = *getArgReference_str(stk, pci, 1);
+               tbl = *getArgReference_str(stk, pci, 2);
+       }
+       return BSKTregisterInternal(cntxt,mb,sch,tbl);
+}
+
+str
 BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str sch, tbl;
@@ -314,57 +321,147 @@ BSKTreset(void *ret)
 }
 
 /* collect the binary files and append them to what we have */
+#define MAXLINE 4096
 str 
-BSKTpush(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
     str sch = *getArgReference_str(stk, pci, 1);
     str tbl = *getArgReference_str(stk, pci, 2);
     str dir = *getArgReference_str(stk, pci, 3);
     int bskt;
-       char buf[BUFSIZ];
+       char buf[PATHLENGTH];
        node *n;
        mvc *m = NULL;
        BAT *b;
-       int first=1;
-       BUN cnt =0;
-       str msg;
+       int first=1,i;
+       BUN cnt =0, bcnt=0;
+       str msg= MAL_SUCCEED;
+       FILE *f;
+       long fsize;
+       char line[MAXLINE];
 
        msg= getSQLContext(cntxt,NULL, &m, NULL);
        if( msg != MAL_SUCCEED)
                return msg;
+       BSKTregisterInternal(cntxt, mb, sch, tbl);
     bskt = BSKTlocate(sch,tbl);
        if (bskt == 0)
-               throw(SQL, "iot.push", "Could not find the basket 
%s.%s",sch,tbl);
+               throw(SQL, "iot.basket", "Could not find the basket 
%s.%s",sch,tbl);
 
        // check access permission to directory first
        if( access (dir , F_OK | R_OK)){
-               throw(SQL, "iot.push", "Could not access the basket directory 
%s. error %d",dir,errno);
+               throw(SQL, "iot.basket", "Could not access the basket directory 
%s. error %d",dir,errno);
        }
        
+       /* check for missing files */
+       for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+               sql_column *c = n->data;
+               snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+               _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+               if( access (buf,R_OK))
+                       throw(MAL,"iot.basket","Could not access the column %s 
file %s\n",c->base.name, buf);
+               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               if( b == 0)
+                       throw(MAL,"iot.basket","Could not access the column 
%s\n",c->base.name);
+       }
+
        // types are already checked during stream initialization
        MT_lock_set(&iotLock);
+       for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n; 
n= n->next){
+               sql_column *c = n->data;
+               snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+               _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
+               f=  fopen(buf,"r");
+               if( f == NULL){
+                       msg= createException(MAL,"iot.basket","Could not access 
the column %s file %s\n",c->base.name, buf);
+                       break;
+               }
+               (void) fseek(f,0, SEEK_END);
+               fsize = ftell(f);
+               rewind(f);
+               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               assert( b);
+               bcnt = BATcount(b);
+
+               switch(ATOMstorage(b->ttype)){
+               case TYPE_bit:
+               case TYPE_bte:
+               case TYPE_sht:
+               case TYPE_int:
+               case TYPE_void:
+               case TYPE_oid:
+               case TYPE_flt:
+               case TYPE_dbl:
+               case TYPE_wrd:
+               case TYPE_lng:
+#ifdef HAVE_HGE
+               case TYPE_hge:
+#endif
+                       if( BATextend(b, bcnt + fsize / ATOMsize(b->ttype)) != 
GDK_SUCCEED){
+                               BBPunfix(b->batCacheid);
+                               (void) fclose(f);
+                               msg= createException(MAL,"iot.basket","Could 
not extend basket %s\n",c->base.name);
+                               goto recover;
+                       }
+                       /* append the binary partition */
+                       if( fread(Tloc(b, BUNlast(b)),1,fsize, f) != (size_t) 
fsize){
+                               BBPunfix(b->batCacheid);
+                               (void) fclose(f);
+                               msg= createException(MAL,"iot.basket","Could 
not read complete basket file %s\n",c->base.name);
+                               goto recover;
+                       }
+                       BATsetcount(b, bcnt + fsize/ ATOMsize(b->ttype));
+               break;
+               case TYPE_str:
+                       while (fgets(line, MAXLINE, f) != 0){
+                               if ( line[i= strlen(line)-1] != '\n')
+                                       msg= 
createException(MAL,"iot.basket","string too long\n");
+                               else{
+                                       line[i] = 0;
+                                       BUNappend(b, line, TRUE);
+                                       bcnt++;
+                               }
+                       }
+                       BATsetcount(b, bcnt );
+               }
+               (void) fclose(f);
+       }
+
+       /* check for mis-aligned columns */
+       for( n = baskets[bskt].table->columns.set->h; msg == MAL_SUCCEED && n; 
n= n->next){
+               sql_column *c = n->data;
+               b = store_funcs.bind_col(m->session->tr,c,RDONLY);
+               assert( b );
+               if( first){
+                       first = 0;
+                       cnt = BATcount(b);
+               } else
+               if( cnt != BATcount(b))
+                       msg= createException(MAL,"iot.basket","Columns 
mis-aligned %s\n",c->base.name);
+               BBPunfix(b->batCacheid);
+       }
+       /* remove the basket files */
        for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
                sql_column *c = n->data;
-               snprintf(buf,BUFSIZ, "%s%c%s",dir,DIR_SEP, c->base.name);
-               _DEBUG_BASKET_ mnstr_printf(BSKTout,"Attach the file %s\n",buf);
-               //BATattach(c->type.type->localtype,buf,PERSISTENT);
+               snprintf(buf,PATHLENGTH, "%s%c%s",dir,DIR_SEP, c->base.name);
+               assert( access (buf,R_OK) == 0);
+               //unlink(buf);
+       }
+
+recover:
+       /* reset all BATs when they are misaligned or error occurred */
+       if( msg != MAL_SUCCEED)
+       for( n = baskets[bskt].table->columns.set->h; n; n= n->next){
+               sql_column *c = n->data;
                b = store_funcs.bind_col(m->session->tr,c,RDONLY);
-               if( b){ 
-                       baskets[bskt].count = BATcount(b);
-                       BBPunfix(b->batCacheid);
-                       if( first){
-                               cnt = BATcount(b);
-                               first = 0;
-                       } else
-                               if( cnt != BATcount(b)){
-                                       MT_lock_unset(&iotLock);
-                                       throw(MAL,"iot.push","Non-aligned 
binary input files");
-                               }
-               }
+               assert( b );
+               BATsetcount(b,0);
+               BBPunfix(b->batCacheid);
        }
+
        MT_lock_unset(&iotLock);
     (void) mb;
-    return MAL_SUCCEED;
+    return msg;
 }
 str
 BSKTdump(void *ret)
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -92,7 +92,7 @@ iot_export str BSKTappend(Client cntxt, 
 iot_export str BSKTdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to