Changeset: 361404c70821 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/361404c70821
Modified Files:
        sql/backends/monet5/copy.c
        sql/backends/monet5/rel_copy.c
Branch: copyparpipe
Log Message:

Allow very long lines


diffs (truncated from 344 to 300 lines):

diff --git a/sql/backends/monet5/copy.c b/sql/backends/monet5/copy.c
--- a/sql/backends/monet5/copy.c
+++ b/sql/backends/monet5/copy.c
@@ -19,6 +19,9 @@
 
 #include "rel_copy.h"
 
+// #define BLOCK_DEBUG 1
+
+#define MAX_LINE_LENGTH (32 * 1024 * 1024)
 
 #define bailout(f, ...) do { \
                msg = createException(SQL, f,  __VA_ARGS__); \
@@ -65,10 +68,11 @@ dump_data(const char *msg, const char *d
 }
 
 static inline void
-dump_block(const char *msg, BAT *b, lng skip_amount)
+dump_block(const char *msg, BAT *b)
 {
-       int start = (int)skip_amount;
+       int start = (int)b->batInserted;
        int end = (int)BATcount(b);
+       assert(start <= end);
        int len = end - start;
        fprintf(stderr, "%s: bat id %d, range is %d..%d (%d bytes)\n", msg, 
b->batCacheid, start, end, len);
        dump_data("    ", Tloc(b, start), len);
@@ -100,8 +104,12 @@ COPYread(lng *ret_nread, Stream *stream_
        }
 
        BATsetcount(bat, nread);
+       bat->batInserted = 0;
+
        *ret_nread = nread;
-       // dump_block("just read", bat, 0);
+#ifdef BLOCK_DEBUG
+       dump_block("just read", bat);
+#endif
 end:
        if (bat != NULL)
                BBPunfix(bat->batCacheid);
@@ -152,15 +160,18 @@ get_sep_char(str sep, bool backslash_esc
 }
 
 static str
-COPYfixlines(lng *ret_linecount, lng *ret_bytesmoved, bat *left_block, lng 
*left_skip_amount, bat *right_block, str *linesep_arg, str *quote_arg, bit 
*escape)
+COPYfixlines(
+         bat *ret_left, bat *ret_right, lng *ret_linecount,
+         bat *left_block, bat *right_block, str *linesep_arg, str *quote_arg, 
bit *escape)
 {
        str msg = MAL_SUCCEED;
        int linesep, quote;
        bool backslash_escapes;
        BAT *left = NULL, *right = NULL;
+       BAT *new_left = NULL, *new_right = NULL;
        int start, left_size, right_size;
        char *left_data, *right_data;
-       int newline_count;
+       int newline_count = 0;
        int latest_newline;
        bool escape_pending;
        bool quoted;
@@ -176,22 +187,29 @@ COPYfixlines(lng *ret_linecount, lng *re
        if (linesep == quote)
                bailout("copy.fixlines", SQLSTATE(42000) "line separator and 
quote character cannot be the same");
 
-       if (is_bat_nil(*left_block) || is_bat_nil(*right_block) || 
is_lng_nil(*left_skip_amount) || is_bit_nil(*escape))
+       if (is_bat_nil(*left_block) || is_bat_nil(*right_block) || 
is_bit_nil(*escape))
                bailout("copy.fixlines", "arguments must not be nil");
 
        if ((left = BATdescriptor(*left_block)) == NULL || (right = 
BATdescriptor(*right_block)) == NULL)
                bailout("copy.fixlines", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
        if (BATcount(left) > (BUN)INT_MAX || BATcount(right) > (BUN)INT_MAX)
                bailout("copy.fixlines", SQLSTATE(42000) "block size too 
large");
-       if (BATcount(left) < (BUN)*left_skip_amount || *left_skip_amount > 
(lng)INT_MAX)
-               bailout("copy.fixlines", SQLSTATE(42000) "skip amount out of 
bounds");
 
-       // dump_block("fixlines incoming left", left, *left_skip_amount);
-       // dump_block("fixlines incoming right", right, 0);
+       if (BATcount(left) < left->batInserted)
+               bailout("copy.fixlines", SQLSTATE(42000) "skip amount out of 
bounds");
+       if (right->batInserted != 0)
+               bailout("copy.fixlines", SQLSTATE(42000) "right hand skip 
amount expected to be zero, not " BUNFMT, right->batInserted);
+       if (left->hseqbase != 0 || right->hseqbase != 0)
+               bailout("copy.fixlines", SQLSTATE(42000) "hseqbases must be 0");
+
+#ifdef BLOCK_DEBUG
+       dump_block("fixlines incoming left", left);
+       dump_block("fixlines incoming right", right);
+#endif
 
        // Scan 'left' for unquoted newlines. Determine both the count and the 
position
        // of the last occurrence.
-       start = (int)*left_skip_amount;
+       start = (int)left->batInserted;
        left_size = (int)BATcount(left);
        escape_pending = false;
        quoted = false;
@@ -217,16 +235,18 @@ COPYfixlines(lng *ret_linecount, lng *re
                }
        } else {
                // start == left_size means left block is empty, nothing to do
+               new_left = left;
+               new_right = right;
                *ret_linecount = 0;
-               *ret_bytesmoved = 0;
                msg = MAL_SUCCEED;
                goto end;
        }
 
        if (!escape_pending && !quoted && latest_newline == left_size - 1) {
                // Block ends in a newline, nothing more to do
+               new_left = left;
+               new_right = right;
                *ret_linecount = newline_count;
-               *ret_bytesmoved = 0;
                msg = MAL_SUCCEED;
                goto end;
        }
@@ -251,33 +271,67 @@ COPYfixlines(lng *ret_linecount, lng *re
                        break;
                }
        }
-       if (borrow == -1)
-               bailout("copy.fixlines", SQLSTATE(42000) "line too long");
 
-       if (BATextend(left, left_size + borrow) != GDK_SUCCEED) {
-               bailout("copy.fixlines", GDK_EXCEPTION);
+       if (borrow >= 0) {
+               // Move some bytes from 'right' to 'left'
+               if (BATextend(left, (BUN)left_size + (BUN)borrow) != 
GDK_SUCCEED) {
+                       bailout("copy.fixlines", GDK_EXCEPTION);
+               }
+               memcpy(Tloc(left, left_size), right_data, borrow);
+               BATsetcount(left, (BUN)left_size + (BUN)borrow);
+               right->batInserted = borrow;
+               new_left = left;
+               new_right = right;
+               *ret_linecount = newline_count + 1;
+       } else {
+               // the last line of 'left' is so long it extends all the way 
through 'right'
+               // into the next block. Our invariant is that 'new_left' must 
start and end
+               // on line boundaries and 'new_right' must start on a line 
boundary.
+               // The best way to do so is by appending all of 'right' to 
'left' and
+               // returning the resulting jumbo block as 'new_right', with an 
empty
+               // block as 'new_left'.
+               BUN new_size = (BUN)left_size + (BUN)right_size;
+               if (new_size >= MAX_LINE_LENGTH)
+                       bailout("copy.fixlines", SQLSTATE(42000) "line too 
long: " BUNFMT ", limit set to " BUNFMT, new_size, (BUN)MAX_LINE_LENGTH);
+               if (BATextend(left, new_size) != GDK_SUCCEED) {
+                       bailout("copy.fixlines", GDK_EXCEPTION);
+               }
+               memcpy(Tloc(left, left_size), right_data, right_size);
+               BATsetcount(left, (BUN)left_size + (BUN)right_size);
+               BATsetcount(right, 0);
+               assert(right->batInserted == 0);
+               // notice how 'left' and 'right' cross over:
+               new_left = right;
+               new_right = left;
+               *ret_linecount = 0;
        }
-
-       memcpy(Tloc(left, left_size), right_data, borrow);
-       BATsetcount(left, left_size + borrow);
-
-       assert(*(char*)Tloc(left, BATcount(left) - 1) == linesep);
-
-       *ret_linecount = newline_count + 1;
-       *ret_bytesmoved = borrow;
        msg = MAL_SUCCEED;
 
 end:
-       // if (left)
-       //      dump_block("fixlines outgoing left", left, *left_skip_amount);
-       // if (right)
-       //      dump_block("fixlines outgoing right", right, borrow);
-       // if (left || right)
-       //      fprintf(stderr, "\n");
+
+#ifdef BLOCK_DEBUG
+       fprintf(stderr, "fixlines returning %ld lines\n", *ret_linecount);
+       if (new_left)
+               dump_block("fixlines outgoing new_left", new_left);
+       if (new_right)
+               dump_block("fixlines outgoing new_right", new_right);
+       if (left || right)
+               fprintf(stderr, "\n");
+#endif
+
        if (left != NULL)
                BBPunfix(left->batCacheid);
        if (right != NULL)
                BBPunfix(right->batCacheid);
+       // new_left and new_right are aliases of left and right, but not 
necessarily in that order.
+       if (new_left != NULL) {
+               *ret_left = new_left->batCacheid;
+               BBPretain(new_left->batCacheid);
+       }
+       if (new_right != NULL) {
+               *ret_right = new_right->batCacheid;
+               BBPretain(new_right->batCacheid);
+       }
        return msg;
 }
 
@@ -640,6 +694,7 @@ scan_fields(
        return 0;
 }
 
+
 static str
 COPYsplitlines(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -652,14 +707,15 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
        int ncols = pci->retc;
        int line_sep, col_sep, quote;
 
+       assert(pci->argc == pci->retc + 7);
        bat block_bat_id = *getArgReference_bat(stk, pci, pci->retc + 0);
-       int skip_amount = *getArgReference_int(stk, pci, pci->retc + 1);
-       lng line_count = *getArgReference_lng(stk, pci, pci->retc + 2);
-       str col_sep_str = *getArgReference_str(stk, pci, pci->retc + 3);
-       str line_sep_str = *getArgReference_str(stk, pci, pci->retc + 4);
-       str quote_str = *getArgReference_str(stk, pci, pci->retc + 5);
-       str null_repr = *getArgReference_str(stk, pci, pci->retc + 6);
-       bool backslash_escapes = *getArgReference_bit(stk, pci, pci->retc + 7);
+       lng line_count = *getArgReference_lng(stk, pci, pci->retc + 1);
+       str col_sep_str = *getArgReference_str(stk, pci, pci->retc + 2);
+       str line_sep_str = *getArgReference_str(stk, pci, pci->retc + 3);
+       str quote_str = *getArgReference_str(stk, pci, pci->retc + 4);
+       str null_repr = *getArgReference_str(stk, pci, pci->retc + 5);
+       bool backslash_escapes = *getArgReference_bit(stk, pci, pci->retc + 6);
+
 
        line_sep = get_sep_char(line_sep_str, backslash_escapes);
        if (line_sep <= 0) // 0 not ok
@@ -677,8 +733,7 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
        if ((block_bat = BATdescriptor(block_bat_id)) == NULL)
                bailout("copy.splitlines", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
 
-       assert(skip_amount >= 0);
-       assert( (line_count == 0) == (skip_amount == (int)BATcount(block_bat)) 
);
+       assert( (line_count == 0) == (BATcount(block_bat) == 
block_bat->batInserted) );
 
        return_bats = GDKzalloc(ncols * sizeof(*return_bats));
        return_indices = GDKzalloc(ncols * sizeof(*return_indices));
@@ -693,10 +748,12 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
                return_indices[i] = Tloc(b, 0);
        }
 
-       // dump_block("splitlines", block_bat, skip_amount);
+#ifdef BLOCK_DEBUG
+       dump_block("splitlines", block_bat);
+#endif
        int ret;
        ret = scan_fields(
-               Tloc(block_bat, 0), skip_amount, Tloc(block_bat, 
BATcount(block_bat)),
+               Tloc(block_bat, 0), (int)block_bat->batInserted, 
Tloc(block_bat, BATcount(block_bat)),
                col_sep, line_sep, quote, backslash_escapes, null_repr,
                ncols, line_count,
                return_indices);
@@ -926,13 +983,13 @@ static mel_func copy_init_funcs[] = {
                arg("stream", streams), arg("block_size", lng), batarg("block", 
bte)
  )),
  command("copy", "fixlines", COPYfixlines, true, "Copy bytes from 'right' to 
'left' to complete the final line of 'left'. Return left line count and bytes 
copied",
-       args(2, 8,
-       arg("linecount", lng), arg("bytesmoved", int),
-       batarg("left",bte), arg("left_skip", int), batarg("right", bte), 
arg("linesep", str), arg("quote", str), arg("escape", bit)
+       args(3, 8,
+       batarg("new_left", bte), batarg("new_right", bte), arg("linecount", 
lng),
+       batarg("left", bte), batarg("right", bte), arg("linesep", str), 
arg("quote", str), arg("escape", bit),
  )),
- pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the 
individual columns", args(1, 9,
+ pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the 
individual columns", args(1, 8,
        batvararg("", int),
-       batarg("block", bte), arg("skip", int), arg("linecount", lng), 
arg("col_sep", str), arg("line_sep", str), arg("quote", str), arg("null_repr", 
str), arg("escape", bit)
+       batarg("block", bte), arg("linecount", lng), arg("col_sep", str), 
arg("line_sep", str), arg("quote", str), arg("null_repr", str), arg("escape", 
bit)
  )),
 
  pattern("copy", "parse_generic", COPYparse_generic, false, "Parse as an 
integer", args(1, 4,
diff --git a/sql/backends/monet5/rel_copy.c b/sql/backends/monet5/rel_copy.c
--- a/sql/backends/monet5/rel_copy.c
+++ b/sql/backends/monet5/rel_copy.c
@@ -79,7 +79,6 @@ emit_send(MalBlkPtr mb, int var_channel,
 struct loop_vars {
        int loop_barrier;
        int our_block;
-       int our_skip_amount;
        int our_line_count;
 };
 
@@ -104,10 +103,6 @@ emit_onserver(
        q = pushLng(mb, q, alloc);
        int var_block_channel = getDestVar(q);
 
-       q = newAssignment(mb);
-       q = pushInt(mb, q, 0);
-       int var_skip_amounts_channel = getDestVar(q);
-
 
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to