Changeset: bc317bcaa5c1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/bc317bcaa5c1
Modified Files:
clients/Tests/exports.stable.out
monetdb5/mal/mal_namespace.h
monetdb5/optimizer/opt_support.c
sql/backends/monet5/rel_bin.c
Branch: copyfaster
Log Message:
Optimize COPY BINARY ON CLIENT
With ON SERVER, the sql.importColumn calls are run in parallel. With ON
CLIENT they run sequentially because the mapi upload mechanism is not
thread safe.
In this patch we split str-like importColumn into sql.importRaw which
still runs sequentially, and sql.importNulTerminated which runs in
parallel.
diffs (179 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1030,6 +1030,8 @@ const char hgeRef[];
const char identityRef[];
const char ifthenelseRef[];
const char importColumnRef[];
+const char importNulTerminatedRef[];
+const char importRawRef[];
void initNamespace(void);
void insertSymbol(Module scope, Symbol prg);
str instruction2str(MalBlkPtr mb, MalStkPtr stl, InstrPtr p, int hidden);
diff --git a/monetdb5/mal/mal_namespace.h b/monetdb5/mal/mal_namespace.h
--- a/monetdb5/mal/mal_namespace.h
+++ b/monetdb5/mal/mal_namespace.h
@@ -129,6 +129,8 @@
FUNC(identity); \
FUNC(ifthenelse); \
FUNC(importColumn); \
+ FUNC(importNulTerminated); \
+ FUNC(importRaw); \
FUNC(int); \
FUNC(intersect); \
FUNC(intersectcand); \
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -324,6 +324,10 @@ hasSideEffects(MalBlkPtr mb, InstrPtr p,
return FALSE;
if (getFunctionId(p) == importColumnRef)
return FALSE;
+ if (getFunctionId(p) == importRawRef)
+ return FALSE;
+ if (getFunctionId(p) == importNulTerminatedRef)
+ return FALSE;
return TRUE;
}
if (getModuleId(p) == mapiRef) {
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -1266,7 +1266,7 @@ exp2bin_coalesce(backend *be, sql_exp *f
/* This is the per-column portion of exp2bin_copyfrombinary */
static stmt *
-emit_loadcolumn(backend *be, stmt *onclient_stmt, stmt *bswap_stmt, int
*count_var, node *file_node, node *type_node)
+emit_loadcolumn(backend *be, int onclient, stmt *onclient_stmt, stmt
*bswap_stmt, int *count_var, node *file_node, node *type_node)
{
MalBlkPtr mb = be->mb;
@@ -1296,32 +1296,78 @@ emit_loadcolumn(backend *be, stmt *oncli
int new_count_var = newTmpVariable(mb, TYPE_oid);
- InstrPtr p = newStmt(mb, sqlRef, importColumnRef);
- if (p != NULL) {
- setArgType(mb, p, 0, bat_type);
+ int base_type = ATOMbasetype(data_type);
+ bool split = (onclient > 0 && base_type == TYPE_str);
+
+ InstrPtr p;
+ if (!split) {
+ // Emit a single sql.importColumn statement
+ p = newStmt(mb, sqlRef, importColumnRef);
+ if (p != NULL) {
+ setArgType(mb, p, 0, bat_type);
+ p = pushReturn(mb, p, new_count_var);
+ //
+ p = pushStr(mb, p, method);
+ p = pushInt(mb, p, width);
+ p = pushArgument(mb, p, bswap_stmt->nr);
+ p = pushArgument(mb, p, file_stmt->nr);
+ p = pushArgument(mb, p, onclient_stmt->nr);
+ if (*count_var < 0)
+ p = pushOid(mb, p, 0);
+ else
+ p = pushArgument(mb, p, *count_var);
+ pushInstruction(mb, p);
+ }
+ if (p == NULL || mb->errors)
+ goto malloc_failed;
+ } else {
+ // Emit sql.importRaw followed by sql.importNulTerminated
+
+ p = newStmtArgs(mb, sqlRef, importRawRef, 5);
+ if (p == NULL)
+ goto malloc_failed;
+ setArgType(mb, p, 0, newBatType(TYPE_bte));
p = pushReturn(mb, p, new_count_var);
-
- p = pushStr(mb, p, method);
- p = pushInt(mb, p, width);
- p = pushArgument(mb, p, bswap_stmt->nr);
+ //
p = pushArgument(mb, p, file_stmt->nr);
p = pushArgument(mb, p, onclient_stmt->nr);
if (*count_var < 0)
p = pushOid(mb, p, 0);
else
p = pushArgument(mb, p, *count_var);
+ if (p == NULL || mb->errors)
+ goto malloc_failed;
pushInstruction(mb, p);
- }
- if (p == NULL || mb->errors) {
- if (ma_get_eb(be->mvc->sa)->enabled)
- eb_error(ma_get_eb(be->mvc->sa), be->mvc->errstr[0] ?
be->mvc->errstr : mb->errors ? mb->errors : *GDKerrbuf ? GDKerrbuf : "out of
memory", 1000);
- return sql_error(be->mvc, 10, SQLSTATE(HY013) MAL_MALLOC_FAIL);
+
+ int tmp_bat = getArg(p, 0);
+ int dummy_count_var = newTmpVariable(mb, TYPE_oid);
+ p = newStmtArgs(mb, sqlRef, importNulTerminatedRef, 6);
+ if (p == NULL)
+ goto malloc_failed;
+ setArgType(mb, p, 0, bat_type);
+ p = pushReturn(mb, p, dummy_count_var);
+
+ p = pushStr(mb, p, method);
+ p = pushInt(mb, p, width);
+ p = pushArgument(mb, p, tmp_bat);
+ if (*count_var < 0)
+ p = pushOid(mb, p, 0);
+ else
+ p = pushArgument(mb, p, new_count_var);
+ if (p == NULL || mb->errors)
+ goto malloc_failed;
+ pushInstruction(mb, p);
}
*count_var = new_count_var;
stmt *s = stmt_blackbox_result(be, p, 0, subtype);
return s;
+
+malloc_failed:
+ if (ma_get_eb(be->mvc->sa)->enabled)
+ eb_error(ma_get_eb(be->mvc->sa), be->mvc->errstr[0] ?
be->mvc->errstr : mb->errors ? mb->errors : *GDKerrbuf ? GDKerrbuf : "out of
memory", 1000);
+ return sql_error(be->mvc, 10, SQLSTATE(HY013) MAL_MALLOC_FAIL);
}
/* Try to predict which column will be quickest to load first */
@@ -1354,11 +1400,10 @@ exp2bin_copyfrombinary(backend *be, sql_
stmt *bswap_stmt = exp_bin(be, bswap_exp, NULL, NULL, NULL, NULL, NULL,
NULL, 0, 0, 0);
/* If it's ON SERVER we can optimize by running the imports in parallel
*/
- bool onserver = false;
+ int onclient = 1;
if (onclient_exp->type == e_atom) {
atom *onclient_atom = onclient_exp->l;
- int onclient = onclient_atom->data.val.ival;
- onserver = (onclient == 0);
+ onclient = onclient_atom->data.val.ival;
}
node *const first_file = arg_list->h->next->next->next->next;
@@ -1386,7 +1431,7 @@ exp2bin_copyfrombinary(backend *be, sql_
list *columns = sa_list(sql->sa);
if (columns == NULL)
return NULL;
- stmt *prototype_stmt = emit_loadcolumn(be, onclient_stmt, bswap_stmt,
&count_var, prototype_file, prototype_type);
+ stmt *prototype_stmt = emit_loadcolumn(be, onclient, onclient_stmt,
bswap_stmt, &count_var, prototype_file, prototype_type);
if (!prototype_stmt)
return NULL;
int orig_count_var = count_var;
@@ -1395,12 +1440,12 @@ exp2bin_copyfrombinary(backend *be, sql_
if (type == prototype_type) {
s = prototype_stmt;
} else {
- s = emit_loadcolumn(be, onclient_stmt, bswap_stmt,
&count_var, file, type);
+ s = emit_loadcolumn(be, onclient, onclient_stmt,
bswap_stmt, &count_var, file, type);
if (!s)
return NULL;
}
list_append(columns, s);
- if (onserver) {
+ if (onclient == 0) {
/* Not threading the count variable from one
importColumn to the next
* makes it possible to run them in parallel in a
dataflow region. */
count_var = orig_count_var;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]