Changeset: bc317bcaa5c1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/bc317bcaa5c1
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/mal/mal_namespace.h
        monetdb5/optimizer/opt_support.c
        sql/backends/monet5/rel_bin.c
Branch: copyfaster
Log Message:

Optimize COPY BINARY ON CLIENT

With ON SERVER, the sql.importColumn calls are run in parallel.  With ON
CLIENT they run sequentially because the mapi upload mechanism is not
thread safe.

In this patch we split str-like importColumn into sql.importRaw which
still runs sequentially, and sql.importNulTerminated which runs in
parallel.


diffs (179 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1030,6 +1030,8 @@ const char hgeRef[];
 const char identityRef[];
 const char ifthenelseRef[];
 const char importColumnRef[];
+const char importNulTerminatedRef[];
+const char importRawRef[];
 void initNamespace(void);
 void insertSymbol(Module scope, Symbol prg);
 str instruction2str(MalBlkPtr mb, MalStkPtr stl, InstrPtr p, int hidden);
diff --git a/monetdb5/mal/mal_namespace.h b/monetdb5/mal/mal_namespace.h
--- a/monetdb5/mal/mal_namespace.h
+++ b/monetdb5/mal/mal_namespace.h
@@ -129,6 +129,8 @@
        FUNC(identity); \
        FUNC(ifthenelse); \
        FUNC(importColumn); \
+       FUNC(importNulTerminated); \
+       FUNC(importRaw); \
        FUNC(int); \
        FUNC(intersect); \
        FUNC(intersectcand); \
diff --git a/monetdb5/optimizer/opt_support.c b/monetdb5/optimizer/opt_support.c
--- a/monetdb5/optimizer/opt_support.c
+++ b/monetdb5/optimizer/opt_support.c
@@ -324,6 +324,10 @@ hasSideEffects(MalBlkPtr mb, InstrPtr p,
                        return FALSE;
                if (getFunctionId(p) == importColumnRef)
                        return FALSE;
+               if (getFunctionId(p) == importRawRef)
+                       return FALSE;
+               if (getFunctionId(p) == importNulTerminatedRef)
+                       return FALSE;
                return TRUE;
        }
        if (getModuleId(p) == mapiRef) {
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -1266,7 +1266,7 @@ exp2bin_coalesce(backend *be, sql_exp *f
 
 /* This is the per-column portion of exp2bin_copyfrombinary */
 static stmt *
-emit_loadcolumn(backend *be, stmt *onclient_stmt, stmt *bswap_stmt,  int 
*count_var, node *file_node, node *type_node)
+emit_loadcolumn(backend *be, int onclient, stmt *onclient_stmt, stmt 
*bswap_stmt,  int *count_var, node *file_node, node *type_node)
 {
        MalBlkPtr mb = be->mb;
 
@@ -1296,32 +1296,78 @@ emit_loadcolumn(backend *be, stmt *oncli
 
        int new_count_var = newTmpVariable(mb, TYPE_oid);
 
-       InstrPtr p = newStmt(mb, sqlRef, importColumnRef);
-       if (p != NULL) {
-               setArgType(mb, p, 0, bat_type);
+       int base_type = ATOMbasetype(data_type);
+       bool split = (onclient > 0 && base_type == TYPE_str);
+
+       InstrPtr p;
+       if (!split) {
+               // Emit a single sql.importColumn statement
+               p = newStmt(mb, sqlRef, importColumnRef);
+               if (p != NULL) {
+                       setArgType(mb, p, 0, bat_type);
+                       p = pushReturn(mb, p, new_count_var);
+                       //
+                       p = pushStr(mb, p, method);
+                       p = pushInt(mb, p, width);
+                       p = pushArgument(mb, p, bswap_stmt->nr);
+                       p = pushArgument(mb, p, file_stmt->nr);
+                       p = pushArgument(mb, p, onclient_stmt->nr);
+                       if (*count_var < 0)
+                               p = pushOid(mb, p, 0);
+                       else
+                               p = pushArgument(mb, p, *count_var);
+                       pushInstruction(mb, p);
+               }
+               if (p == NULL || mb->errors)
+                       goto malloc_failed;
+       } else {
+               // Emit sql.importRaw followed by sql.importNulTerminated
+
+               p = newStmtArgs(mb, sqlRef, importRawRef, 5);
+               if (p == NULL)
+                       goto malloc_failed;
+               setArgType(mb, p, 0, newBatType(TYPE_bte));
                p = pushReturn(mb, p, new_count_var);
-
-               p = pushStr(mb, p, method);
-               p = pushInt(mb, p, width);
-               p = pushArgument(mb, p, bswap_stmt->nr);
+               //
                p = pushArgument(mb, p, file_stmt->nr);
                p = pushArgument(mb, p, onclient_stmt->nr);
                if (*count_var < 0)
                        p = pushOid(mb, p, 0);
                else
                        p = pushArgument(mb, p, *count_var);
+               if (p == NULL || mb->errors)
+                       goto malloc_failed;
                pushInstruction(mb, p);
-       }
-       if (p == NULL || mb->errors) {
-               if (ma_get_eb(be->mvc->sa)->enabled)
-                       eb_error(ma_get_eb(be->mvc->sa), be->mvc->errstr[0] ? 
be->mvc->errstr : mb->errors ? mb->errors : *GDKerrbuf ? GDKerrbuf : "out of 
memory", 1000);
-               return sql_error(be->mvc, 10, SQLSTATE(HY013) MAL_MALLOC_FAIL);
+
+               int tmp_bat = getArg(p, 0);
+               int dummy_count_var = newTmpVariable(mb, TYPE_oid);
+               p = newStmtArgs(mb, sqlRef, importNulTerminatedRef, 6);
+               if (p == NULL)
+                       goto malloc_failed;
+               setArgType(mb, p, 0, bat_type);
+               p = pushReturn(mb, p, dummy_count_var);
+
+               p = pushStr(mb, p, method);
+               p = pushInt(mb, p, width);
+               p = pushArgument(mb, p, tmp_bat);
+               if (*count_var < 0)
+                       p = pushOid(mb, p, 0);
+               else
+                       p = pushArgument(mb, p, new_count_var);
+               if (p == NULL || mb->errors)
+                       goto malloc_failed;
+               pushInstruction(mb, p);
        }
 
        *count_var = new_count_var;
 
        stmt *s = stmt_blackbox_result(be, p, 0, subtype);
        return s;
+
+malloc_failed:
+               if (ma_get_eb(be->mvc->sa)->enabled)
+                       eb_error(ma_get_eb(be->mvc->sa), be->mvc->errstr[0] ? 
be->mvc->errstr : mb->errors ? mb->errors : *GDKerrbuf ? GDKerrbuf : "out of 
memory", 1000);
+               return sql_error(be->mvc, 10, SQLSTATE(HY013) MAL_MALLOC_FAIL);
 }
 
 /* Try to predict which column will be quickest to load first */
@@ -1354,11 +1400,10 @@ exp2bin_copyfrombinary(backend *be, sql_
        stmt *bswap_stmt = exp_bin(be, bswap_exp, NULL, NULL, NULL, NULL, NULL, 
NULL, 0, 0, 0);
 
        /* If it's ON SERVER we can optimize by running the imports in parallel 
*/
-       bool onserver = false;
+       int onclient = 1;
        if (onclient_exp->type == e_atom) {
                atom *onclient_atom = onclient_exp->l;
-               int onclient = onclient_atom->data.val.ival;
-               onserver = (onclient == 0);
+               onclient = onclient_atom->data.val.ival;
        }
 
        node *const first_file = arg_list->h->next->next->next->next;
@@ -1386,7 +1431,7 @@ exp2bin_copyfrombinary(backend *be, sql_
        list *columns = sa_list(sql->sa);
        if (columns == NULL)
                return NULL;
-       stmt *prototype_stmt = emit_loadcolumn(be, onclient_stmt, bswap_stmt, 
&count_var, prototype_file, prototype_type);
+       stmt *prototype_stmt = emit_loadcolumn(be, onclient, onclient_stmt, 
bswap_stmt, &count_var, prototype_file, prototype_type);
        if (!prototype_stmt)
                return NULL;
        int orig_count_var = count_var;
@@ -1395,12 +1440,12 @@ exp2bin_copyfrombinary(backend *be, sql_
                if (type == prototype_type) {
                        s = prototype_stmt;
                } else {
-                       s = emit_loadcolumn(be, onclient_stmt, bswap_stmt, 
&count_var, file, type);
+                       s = emit_loadcolumn(be, onclient, onclient_stmt, 
bswap_stmt, &count_var, file, type);
                        if (!s)
                                return NULL;
                }
                list_append(columns, s);
-               if (onserver) {
+               if (onclient == 0) {
                        /* Not threading the count variable from one 
importColumn to the next
                         * makes it possible to run them in parallel in a 
dataflow region. */
                        count_var = orig_count_var;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to