Changeset: 0dfed08a75b4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0dfed08a75b4
Added Files:
sql/test/centipede/Tests/olap.stable.err
sql/test/centipede/Tests/olap.stable.out
sql/test/json/Tests/books.stable.err
sql/test/json/Tests/books.stable.out
Modified Files:
monetdb5/modules/mal/tablet.c
monetdb5/optimizer/opt_centipede.c
sql/backends/monet5/rel_bin.c
sql/include/sql_relation.h
sql/server/rel_updates.c
sql/server/sql_parser.y
sql/storage/bat/bat_storage.c
sql/storage/bat/bat_storage.h
Branch: default
Log Message:
merge with default branch for Feb2013 release branch
diffs (truncated from 1424 to 300 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -1341,6 +1341,8 @@ SQLloader(void *p)
}
}
+#define MAXWORKERS 64
+
BUN
SQLload_file(Client cntxt, Tablet *as, bstream *b, stream *out, char *csep,
char *rsep, char quote, lng skip, lng maxrow)
{
@@ -1351,13 +1353,13 @@ SQLload_file(Client cntxt, Tablet *as, b
BUN i;
size_t rseplen;
READERtask *task = (READERtask *) GDKzalloc(sizeof(READERtask));
- READERtask ptask[16];
- int threads = (!maxrow || maxrow > (1 << 16)) ? (GDKnr_threads < 16 ?
GDKnr_threads : 16) : 1;
+ READERtask ptask[MAXWORKERS];
+ int threads = (!maxrow || maxrow > (1 << 16)) ? (GDKnr_threads <
MAXWORKERS ? GDKnr_threads : MAXWORKERS) : 1;
lng lio = 0, tio, t1 = 0, total = 0, iototal = 0;
int vmtrim = GDK_vm_trim;
str msg = MAL_SUCCEED;
- for (i = 0; i < 16; i++)
+ for (i = 0; i < MAXWORKERS; i++)
ptask[i].cols = 0;
if (task == 0) {
@@ -1524,7 +1526,7 @@ SQLload_file(Client cntxt, Tablet *as, b
* In the first phase we simply break the lines at the
* record boundary. */
if (quote == 0) {
- if (rseplen == 1)
+ if (rseplen == 1) {
for (; *e; e++) {
if (*e == '\\') {
e++;
@@ -1532,7 +1534,17 @@ SQLload_file(Client cntxt, Tablet *as, b
}
if (*e == *rsep)
break;
- } else
+ }
+ } else if (rseplen == 2) {
+ for (; *e; e++) {
+ if (*e == '\\') {
+ e++;
+ continue;
+ }
+ if (*e == *rsep && e[1] ==
rsep[1])
+ break;
+ }
+ } else {
for (; *e; e++) {
if (*e == '\\') {
e++;
@@ -1541,6 +1553,7 @@ SQLload_file(Client cntxt, Tablet *as, b
if (*e == *rsep && strncmp(e,
rsep, rseplen) == 0)
break;
}
+ }
if (*e == 0)
e = 0; /* nonterminated
record, we need more */
} else if (rseplen == 1) {
@@ -1557,6 +1570,20 @@ SQLload_file(Client cntxt, Tablet *as, b
}
if (*e == 0)
e = 0; /* nonterminated
record, we need more */
+ } else if (rseplen == 2) {
+ for (; *e; e++) {
+ if (*e == q)
+ q = 0;
+ else if (*e == quote)
+ q = *e;
+ else if (*e == '\\') {
+ if (e[1])
+ e++;
+ } else if (!q && e[0] == rsep[0] &&
e[1] == rsep[1])
+ break;
+ }
+ if (*e == 0)
+ e = 0; /* nonterminated
record, we need more */
} else {
for (; *e; e++) {
if (*e == q)
@@ -1728,7 +1755,7 @@ SQLload_file(Client cntxt, Tablet *as, b
GDKfree(task->base);
GDKfree(task);
}
- for (i = 0; i < 16; i++)
+ for (i = 0; i < MAXWORKERS; i++)
if (ptask[i].cols)
GDKfree(ptask[i].cols);
#ifdef MLOCK_TST
diff --git a/monetdb5/optimizer/opt_centipede.c
b/monetdb5/optimizer/opt_centipede.c
--- a/monetdb5/optimizer/opt_centipede.c
+++ b/monetdb5/optimizer/opt_centipede.c
@@ -34,10 +34,10 @@
#define DEBUG_OPT_DETAIL
#define _DEBUG_OPT_CENTIPEDE_
-#define BLOCKED 1
-#define PARTITION 2 // phase 1 result
-#define PIVOT 3 // Instruction requires care at next level
-#define SUPPORTIVE 4 // phase 2 result
+#define BLOCKED 1 // Instruction should remain in main routine
+#define PARTITION 2 // Instruction is part of the fragment routine
+#define PIVOT 3 // Instruction is part of the consolidation routine
+#define SUPPORTIVE 4// Instruction is part of fragment routine
/*
* The columns are broken using fixed OID ranges.
@@ -59,14 +59,15 @@ static int nrservers;
* geared at parallel execution
*/
static MalBlkPtr
-OPTexecController(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, InstrPtr ret,
Slices *slices, oid plantag, int *status)
+OPTexecController(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, InstrPtr ret,
InstrPtr packs, Slices *slices, oid plantag, int *status)
{
MalBlkPtr cmb;
Symbol s;
char nme[BUFSIZ], *plan, *stub;
int barrier, x, i, j, k, *alias, nrpack;
- InstrPtr p, q, *pack;
+ InstrPtr p=0, q, *pack;
+ (void) p;
/* define the query controller */
//snprintf(nme, BUFSIZ, "%s_plan"OIDFMT, getFunctionId(
getInstrPtr(mb,0)), plantag);
//putName(nme, strlen(nme));
@@ -132,19 +133,11 @@ OPTexecController(Client cntxt, MalBlkPt
pack[k] = newInstruction(cmb,ASSIGNsymbol);
getModuleId(pack[k]) = matRef;
getFunctionId(pack[k]) = packRef;
- getArg(pack[k],0) = getArg(p,k);
+ getArg(pack[k],0) = getArg(packs,k);
+ assert(packs->argv[k] >=0);
}
}
- /* under dataflow control, initialize the variables
- Arguments are considered defined already */
- for ( k=0 ; k < nrpack ; k++){
- q = newInstruction(cmb,ASSIGNsymbol);
- getArg(q,0) = getArg(pack[k],0);
- pushNil(cmb,q, getArgType(cmb,pack[k],0));
- pushInstruction(cmb,q);
- }
-
#ifdef REMOTE_EXECUTION
q= newFcnCall(cmb,schedulerRef,srvpoolRef);
#else
@@ -198,45 +191,41 @@ OPTexecController(Client cntxt, MalBlkPt
for ( k=0 ; k < nrpack; k++) {
/* after packing we may have to re-do groupings*/
pushInstruction(cmb, pack[k]);
- setVarUsed(cmb,getArg(pack[k],0));
+ setVarUsed(cmb, getArg(pack[k],0));
}
}
-
- /* finalize the dataflow block */
- q= newAssignment(cmb);
- q->barrier = EXITsymbol;
- getArg(q,0) = barrier;
+#ifdef _DEBUG_OPT_CENTIPEDE_
+ mnstr_printf(cntxt->fdout,"\n#cmb structure\n");
+ printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT);
+#endif
(void) status;
/* look for pivot operations in original plan */
for ( i=1; i < mb->stop; i++)
if (status[i] == PIVOT){
+ char buf[BUFSIZ];
q= copyInstruction(getInstrPtr(mb,i));
#ifdef _DEBUG_OPT_CENTIPEDE_
- if ( status[i]){
- mnstr_printf(cntxt->fdout,"\n#cmb include stmt %d status
%d\n",i,status[i]);
+ mnstr_printf(cntxt->fdout,"#cmb include stmt %d status
%d:",i,status[i]);
printInstruction(cntxt->fdout, mb, 0, q,LIST_MAL_STMT);
- }
+ for(k=0; k<q->argc;k++)
+ assert(getArg(q,k) >=0);
#endif
-/*
- for( j= q->retc; j<q->argc; j++){
- int idx;
- InstrPtr pq;
- snprintf(nme,BUFSIZ,"C_%d",getArg(q,j));
- idx= findVariable(pmb,nme);
- if ( idx >= 0)
- getArg(q,j) = idx;
-
- pq= getInstrPtr(cmb,0);
- for ( k = 0; k< pq->retc; k++)
- if (getArg(pq,k) == getArg(q, q->retc)){
- delArgument(pq,k);
- break;
- }
- }
-*/
if (getModuleId(q) == groupRef && (getFunctionId(q) ==
subgroupRef || getFunctionId(q) == subgroupdoneRef)){
+ snprintf(buf,BUFSIZ,"Y_%d",getArg(q,q->retc));
q= copyInstruction(q);
+ k = findVariable(cmb,buf);
+ assert(k >=0);
+ if ( k == -1)
+ getArg(q,q->retc) =
newVariable(cmb,GDKstrdup(buf),TYPE_any);
+ else getArg(q,q->retc) = k;
+ pushInstruction(cmb,q);
+ } else
+ if (getModuleId(q) == aggrRef && getFunctionId(q) == countRef ){
+ q= copyInstruction(q);
+ getFunctionId(q) = sumRef;
+ // correct the return statement
+ setVarType(cmb, getArg(q,1), newBatType(TYPE_oid,
TYPE_wrd));
pushInstruction(cmb,q);
} else
if (getModuleId(q) == aggrRef && getFunctionId(q) ==
subcountRef ){
@@ -251,11 +240,24 @@ OPTexecController(Client cntxt, MalBlkPt
q= copyInstruction(q);
getArg(q,1) = getArg(q,0);
pushInstruction(cmb,q);
+ } else
+ if (getModuleId(q) == algebraRef && getFunctionId(q) ==
leftfetchjoinRef ){
+ snprintf(buf,BUFSIZ,"Y_%d",getArg(q,q->argc-1));
+ q= copyInstruction(q);
+ k = findVariable(cmb,buf);
+ if ( k >=0)
+ getArg(q,q->argc-1) = k;
+ pushInstruction(cmb,q);
} else{
q= copyInstruction(q);
pushInstruction(cmb,q);
}
}
+ /* finalize the dataflow block */
+ q= newAssignment(cmb);
+ q->barrier = EXITsymbol;
+ getArg(q,0) = barrier;
+
/* consolidate the result of the control function */
ret = copyInstruction(ret);
@@ -273,7 +275,7 @@ OPTexecController(Client cntxt, MalBlkPt
cmb->stmt[0]= pushReturn(cmb, cmb->stmt[0], getArg(ret,i));
pushEndInstruction(cmb);
#ifdef _DEBUG_OPT_CENTIPEDE_
- mnstr_printf(cntxt->fdout,"\n#pmb stmt\n");
+ mnstr_printf(cntxt->fdout,"#pmb stmt ");
printInstruction(cntxt->fdout, pmb, 0,
getInstrPtr(pmb,0),LIST_MAL_STMT);
mnstr_printf(cntxt->fdout,"\n#cmb stmt\n");
printInstruction(cntxt->fdout,cmb, 0, getInstrPtr(cmb,0),LIST_MAL_STMT);
@@ -459,6 +461,21 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
* when a connection is re-used by different client sessions.
*/
+static void addvartolist(MalBlkPtr mb, InstrPtr *pq, int arg)
+{
+ int k;
+ InstrPtr p = *pq;
+
+ for ( k = 0; k < p->retc; k++)
+ if (getArg(p,k) == arg)
+ break;
+ if ( k == p->retc){
+ p = pushReturn(mb, p, arg);
+ //p = pushArgument(mb, p, arg);
+ }
+ *pq = p;
+}
+
#ifdef _DEBUG_OPT_CENTIPEDE_
static char *statusname[7]= {"", "blocked ", "partition ", "pivot ",
"support ", "exported ", "keeplocal "};
#endif
@@ -468,7 +485,7 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb,
{
int *status,*vars;
int i, j, k, limit, last;
- InstrPtr ret, orig, planargs= 0, call, q = NULL, p = NULL, *old;
+ InstrPtr cntrlreturn, planreturn, orig, packs= 0, call, q = NULL, p =
NULL, *old;
Symbol s;
MalBlkPtr plan, cntrl, stub;
str msg= MAL_SUCCEED;
@@ -534,11 +551,7 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb,
vars[getArg(p,0)] = PARTITION;
} else
if ( getModuleId(p) == sqlRef && getFunctionId(p) == deltaRef ){
- // Use a readonly view on the database TO BE FIXED
- //clrFunction(p);
- //p->argc =2;
- //p->token = ASSIGNsymbol;
- if ( vars[getArg(p,1)] ){
+ if ( vars[getArg(p,1)] == PARTITION ){
status[i] = PARTITION;
vars[getArg(p,0)] = PARTITION;
}
@@ -546,118 +559,104 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb,
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list