Changeset: 361404c70821 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/361404c70821
Modified Files:
sql/backends/monet5/copy.c
sql/backends/monet5/rel_copy.c
Branch: copyparpipe
Log Message:
Allow very long lines
diffs (truncated from 344 to 300 lines):
diff --git a/sql/backends/monet5/copy.c b/sql/backends/monet5/copy.c
--- a/sql/backends/monet5/copy.c
+++ b/sql/backends/monet5/copy.c
@@ -19,6 +19,9 @@
#include "rel_copy.h"
+// #define BLOCK_DEBUG 1
+
+#define MAX_LINE_LENGTH (32 * 1024 * 1024)
#define bailout(f, ...) do { \
msg = createException(SQL, f, __VA_ARGS__); \
@@ -65,10 +68,11 @@ dump_data(const char *msg, const char *d
}
static inline void
-dump_block(const char *msg, BAT *b, lng skip_amount)
+dump_block(const char *msg, BAT *b)
{
- int start = (int)skip_amount;
+ int start = (int)b->batInserted;
int end = (int)BATcount(b);
+ assert(start <= end);
int len = end - start;
fprintf(stderr, "%s: bat id %d, range is %d..%d (%d bytes)\n", msg,
b->batCacheid, start, end, len);
dump_data(" ", Tloc(b, start), len);
@@ -100,8 +104,12 @@ COPYread(lng *ret_nread, Stream *stream_
}
BATsetcount(bat, nread);
+ bat->batInserted = 0;
+
*ret_nread = nread;
- // dump_block("just read", bat, 0);
+#ifdef BLOCK_DEBUG
+ dump_block("just read", bat);
+#endif
end:
if (bat != NULL)
BBPunfix(bat->batCacheid);
@@ -152,15 +160,18 @@ get_sep_char(str sep, bool backslash_esc
}
static str
-COPYfixlines(lng *ret_linecount, lng *ret_bytesmoved, bat *left_block, lng
*left_skip_amount, bat *right_block, str *linesep_arg, str *quote_arg, bit
*escape)
+COPYfixlines(
+ bat *ret_left, bat *ret_right, lng *ret_linecount,
+ bat *left_block, bat *right_block, str *linesep_arg, str *quote_arg,
bit *escape)
{
str msg = MAL_SUCCEED;
int linesep, quote;
bool backslash_escapes;
BAT *left = NULL, *right = NULL;
+ BAT *new_left = NULL, *new_right = NULL;
int start, left_size, right_size;
char *left_data, *right_data;
- int newline_count;
+ int newline_count = 0;
int latest_newline;
bool escape_pending;
bool quoted;
@@ -176,22 +187,29 @@ COPYfixlines(lng *ret_linecount, lng *re
if (linesep == quote)
bailout("copy.fixlines", SQLSTATE(42000) "line separator and
quote character cannot be the same");
- if (is_bat_nil(*left_block) || is_bat_nil(*right_block) ||
is_lng_nil(*left_skip_amount) || is_bit_nil(*escape))
+ if (is_bat_nil(*left_block) || is_bat_nil(*right_block) ||
is_bit_nil(*escape))
bailout("copy.fixlines", "arguments must not be nil");
if ((left = BATdescriptor(*left_block)) == NULL || (right =
BATdescriptor(*right_block)) == NULL)
bailout("copy.fixlines", SQLSTATE(HY002)
RUNTIME_OBJECT_MISSING);
if (BATcount(left) > (BUN)INT_MAX || BATcount(right) > (BUN)INT_MAX)
bailout("copy.fixlines", SQLSTATE(42000) "block size too
large");
- if (BATcount(left) < (BUN)*left_skip_amount || *left_skip_amount >
(lng)INT_MAX)
- bailout("copy.fixlines", SQLSTATE(42000) "skip amount out of
bounds");
- // dump_block("fixlines incoming left", left, *left_skip_amount);
- // dump_block("fixlines incoming right", right, 0);
+ if (BATcount(left) < left->batInserted)
+ bailout("copy.fixlines", SQLSTATE(42000) "skip amount out of
bounds");
+ if (right->batInserted != 0)
+ bailout("copy.fixlines", SQLSTATE(42000) "right hand skip
amount expected to be zero, not " BUNFMT, right->batInserted);
+ if (left->hseqbase != 0 || right->hseqbase != 0)
+ bailout("copy.fixlines", SQLSTATE(42000) "hseqbases must be 0");
+
+#ifdef BLOCK_DEBUG
+ dump_block("fixlines incoming left", left);
+ dump_block("fixlines incoming right", right);
+#endif
// Scan 'left' for unquoted newlines. Determine both the count and the
position
// of the last occurrence.
- start = (int)*left_skip_amount;
+ start = (int)left->batInserted;
left_size = (int)BATcount(left);
escape_pending = false;
quoted = false;
@@ -217,16 +235,18 @@ COPYfixlines(lng *ret_linecount, lng *re
}
} else {
// start == left_size means left block is empty, nothing to do
+ new_left = left;
+ new_right = right;
*ret_linecount = 0;
- *ret_bytesmoved = 0;
msg = MAL_SUCCEED;
goto end;
}
if (!escape_pending && !quoted && latest_newline == left_size - 1) {
// Block ends in a newline, nothing more to do
+ new_left = left;
+ new_right = right;
*ret_linecount = newline_count;
- *ret_bytesmoved = 0;
msg = MAL_SUCCEED;
goto end;
}
@@ -251,33 +271,67 @@ COPYfixlines(lng *ret_linecount, lng *re
break;
}
}
- if (borrow == -1)
- bailout("copy.fixlines", SQLSTATE(42000) "line too long");
- if (BATextend(left, left_size + borrow) != GDK_SUCCEED) {
- bailout("copy.fixlines", GDK_EXCEPTION);
+ if (borrow >= 0) {
+ // Move some bytes from 'right' to 'left'
+ if (BATextend(left, (BUN)left_size + (BUN)borrow) !=
GDK_SUCCEED) {
+ bailout("copy.fixlines", GDK_EXCEPTION);
+ }
+ memcpy(Tloc(left, left_size), right_data, borrow);
+ BATsetcount(left, (BUN)left_size + (BUN)borrow);
+ right->batInserted = borrow;
+ new_left = left;
+ new_right = right;
+ *ret_linecount = newline_count + 1;
+ } else {
+ // the last line of 'left' is so long it extends all the way
through 'right'
+ // into the next block. Our invariant is that 'new_left' must
start and end
+ // on line boundaries and 'new_right' must start on a line
boundary.
+ // The best way to do so is by appending all of 'right' to
'left' and
+ // returning the resulting jumbo block as 'new_right', with an
empty
+ // block as 'new_left'.
+ BUN new_size = (BUN)left_size + (BUN)right_size;
+ if (new_size >= MAX_LINE_LENGTH)
+ bailout("copy.fixlines", SQLSTATE(42000) "line too
long: " BUNFMT ", limit set to " BUNFMT, new_size, (BUN)MAX_LINE_LENGTH);
+ if (BATextend(left, new_size) != GDK_SUCCEED) {
+ bailout("copy.fixlines", GDK_EXCEPTION);
+ }
+ memcpy(Tloc(left, left_size), right_data, right_size);
+ BATsetcount(left, (BUN)left_size + (BUN)right_size);
+ BATsetcount(right, 0);
+ assert(right->batInserted == 0);
+ // notice how 'left' and 'right' cross over:
+ new_left = right;
+ new_right = left;
+ *ret_linecount = 0;
}
-
- memcpy(Tloc(left, left_size), right_data, borrow);
- BATsetcount(left, left_size + borrow);
-
- assert(*(char*)Tloc(left, BATcount(left) - 1) == linesep);
-
- *ret_linecount = newline_count + 1;
- *ret_bytesmoved = borrow;
msg = MAL_SUCCEED;
end:
- // if (left)
- // dump_block("fixlines outgoing left", left, *left_skip_amount);
- // if (right)
- // dump_block("fixlines outgoing right", right, borrow);
- // if (left || right)
- // fprintf(stderr, "\n");
+
+#ifdef BLOCK_DEBUG
+ fprintf(stderr, "fixlines returning %ld lines\n", *ret_linecount);
+ if (new_left)
+ dump_block("fixlines outgoing new_left", new_left);
+ if (new_right)
+ dump_block("fixlines outgoing new_right", new_right);
+ if (left || right)
+ fprintf(stderr, "\n");
+#endif
+
if (left != NULL)
BBPunfix(left->batCacheid);
if (right != NULL)
BBPunfix(right->batCacheid);
+ // new_left and new_right are aliases of left and right, but not
necessarily in that order.
+ if (new_left != NULL) {
+ *ret_left = new_left->batCacheid;
+ BBPretain(new_left->batCacheid);
+ }
+ if (new_right != NULL) {
+ *ret_right = new_right->batCacheid;
+ BBPretain(new_right->batCacheid);
+ }
return msg;
}
@@ -640,6 +694,7 @@ scan_fields(
return 0;
}
+
static str
COPYsplitlines(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
@@ -652,14 +707,15 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
int ncols = pci->retc;
int line_sep, col_sep, quote;
+ assert(pci->argc == pci->retc + 7);
bat block_bat_id = *getArgReference_bat(stk, pci, pci->retc + 0);
- int skip_amount = *getArgReference_int(stk, pci, pci->retc + 1);
- lng line_count = *getArgReference_lng(stk, pci, pci->retc + 2);
- str col_sep_str = *getArgReference_str(stk, pci, pci->retc + 3);
- str line_sep_str = *getArgReference_str(stk, pci, pci->retc + 4);
- str quote_str = *getArgReference_str(stk, pci, pci->retc + 5);
- str null_repr = *getArgReference_str(stk, pci, pci->retc + 6);
- bool backslash_escapes = *getArgReference_bit(stk, pci, pci->retc + 7);
+ lng line_count = *getArgReference_lng(stk, pci, pci->retc + 1);
+ str col_sep_str = *getArgReference_str(stk, pci, pci->retc + 2);
+ str line_sep_str = *getArgReference_str(stk, pci, pci->retc + 3);
+ str quote_str = *getArgReference_str(stk, pci, pci->retc + 4);
+ str null_repr = *getArgReference_str(stk, pci, pci->retc + 5);
+ bool backslash_escapes = *getArgReference_bit(stk, pci, pci->retc + 6);
+
line_sep = get_sep_char(line_sep_str, backslash_escapes);
if (line_sep <= 0) // 0 not ok
@@ -677,8 +733,7 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
if ((block_bat = BATdescriptor(block_bat_id)) == NULL)
bailout("copy.splitlines", SQLSTATE(HY002)
RUNTIME_OBJECT_MISSING);
- assert(skip_amount >= 0);
- assert( (line_count == 0) == (skip_amount == (int)BATcount(block_bat))
);
+ assert( (line_count == 0) == (BATcount(block_bat) ==
block_bat->batInserted) );
return_bats = GDKzalloc(ncols * sizeof(*return_bats));
return_indices = GDKzalloc(ncols * sizeof(*return_indices));
@@ -693,10 +748,12 @@ COPYsplitlines(Client cntxt, MalBlkPtr m
return_indices[i] = Tloc(b, 0);
}
- // dump_block("splitlines", block_bat, skip_amount);
+#ifdef BLOCK_DEBUG
+ dump_block("splitlines", block_bat);
+#endif
int ret;
ret = scan_fields(
- Tloc(block_bat, 0), skip_amount, Tloc(block_bat,
BATcount(block_bat)),
+ Tloc(block_bat, 0), (int)block_bat->batInserted,
Tloc(block_bat, BATcount(block_bat)),
col_sep, line_sep, quote, backslash_escapes, null_repr,
ncols, line_count,
return_indices);
@@ -926,13 +983,13 @@ static mel_func copy_init_funcs[] = {
arg("stream", streams), arg("block_size", lng), batarg("block",
bte)
)),
command("copy", "fixlines", COPYfixlines, true, "Copy bytes from 'right' to
'left' to complete the final line of 'left'. Return left line count and bytes
copied",
- args(2, 8,
- arg("linecount", lng), arg("bytesmoved", int),
- batarg("left",bte), arg("left_skip", int), batarg("right", bte),
arg("linesep", str), arg("quote", str), arg("escape", bit)
+ args(3, 8,
+ batarg("new_left", bte), batarg("new_right", bte), arg("linecount",
lng),
+ batarg("left", bte), batarg("right", bte), arg("linesep", str),
arg("quote", str), arg("escape", bit),
)),
- pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the
individual columns", args(1, 9,
+ pattern("copy", "splitlines", COPYsplitlines, false, "Find the fields of the
individual columns", args(1, 8,
batvararg("", int),
- batarg("block", bte), arg("skip", int), arg("linecount", lng),
arg("col_sep", str), arg("line_sep", str), arg("quote", str), arg("null_repr",
str), arg("escape", bit)
+ batarg("block", bte), arg("linecount", lng), arg("col_sep", str),
arg("line_sep", str), arg("quote", str), arg("null_repr", str), arg("escape",
bit)
)),
pattern("copy", "parse_generic", COPYparse_generic, false, "Parse as an
integer", args(1, 4,
diff --git a/sql/backends/monet5/rel_copy.c b/sql/backends/monet5/rel_copy.c
--- a/sql/backends/monet5/rel_copy.c
+++ b/sql/backends/monet5/rel_copy.c
@@ -79,7 +79,6 @@ emit_send(MalBlkPtr mb, int var_channel,
struct loop_vars {
int loop_barrier;
int our_block;
- int our_skip_amount;
int our_line_count;
};
@@ -104,10 +103,6 @@ emit_onserver(
q = pushLng(mb, q, alloc);
int var_block_channel = getDestVar(q);
- q = newAssignment(mb);
- q = pushInt(mb, q, 0);
- int var_skip_amounts_channel = getDestVar(q);
-
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]