Changeset: 3d3be33030f9 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3d3be33030f9 Modified Files: monetdb5/modules/mal/tablet.c sql/backends/monet5/sql_scenario.c sql/storage/bat/bat_storage.c sql/test/BugTracker-2010/Tests/incomplete-utf8-sequence.Bug-2575.stable.err sql/test/BugTracker-2018/Tests/sqlitelogictest-having-not-null-not-in.Bug-6557.stable.out sql/test/subquery/Tests/correlated.sql Branch: grouping-analytics Log Message:
Merge with default
diffs (truncated from 603 to 300 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -1226,6 +1226,39 @@ SQLworkdivider(READERtask *task, READERt
* If we end up with unfinished records, then the rowlimit will terminate the
process.
*/
+typedef unsigned char (*dfa_t)[256];
+
+static dfa_t
+mkdfa(const unsigned char *sep, size_t seplen)
+{
+ dfa_t dfa;
+ size_t i, j, k;
+
+ dfa = GDKzalloc(seplen * sizeof(*dfa));
+ if (dfa == NULL)
+ return NULL;
+ /* Each character in the separator string advances the state by
+ * one. If state reaches seplen, the separator was recognized.
+ *
+ * The first loop and the nested loop make sure that if in any
+ * state we encounter an invalid character, but part of what we've
+ * matched so far is a prefix of the separator, we go to the
+ * appropriate state. */
+ for (i = 0; i < seplen; i++)
+ dfa[i][sep[0]] = 1;
+ for (j = 0; j < seplen; j++) {
+ dfa[j][sep[j]] = j + 1;
+ for (k = 0; k < j; k++) {
+ for (i = 0; i < j - k; i++)
+ if (sep[k + i] != sep[i])
+ break;
+ if (i == j - k && dfa[j][sep[i]] <= i)
+ dfa[j][sep[i]] = (unsigned char) (i + 1);
+ }
+ }
+ return dfa;
+}
+
static void
SQLproducer(void *p)
{
@@ -1239,12 +1272,21 @@ SQLproducer(void *p)
const char *rsep = task->rsep;
size_t rseplen = strlen(rsep), partial = 0;
char quote = task->quote;
+ dfa_t rdfa;
+ lng rowno = 1;
MT_sema_down(&task->producer);
if (task->id < 0) {
return;
}
+ rdfa = mkdfa((const unsigned char *) rsep, rseplen);
+ if (rdfa == NULL) {
+ tablet_error(task, lng_nil, int_nil, "cannot allocate memory",
"");
+ ateof[cur] = true;
+ goto reportlackofinput;
+ }
+
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#SQLproducer started size %zu len %zu\n",
task->b->size, task->b->len);
@@ -1268,7 +1310,7 @@ SQLproducer(void *p)
// warn the consumers
if (ateof[cur] && partial) {
if (partial) {
- tablet_error(task, lng_nil, int_nil,
"incomplete record at end of file", s);
+ tablet_error(task, rowno, int_nil, "incomplete
record at end of file", s);
task->b->pos += partial;
}
goto reportlackofinput;
@@ -1276,7 +1318,7 @@ SQLproducer(void *p)
if (task->errbuf && task->errbuf[0]) {
if (GDKerrbuf && GDKerrbuf[0]) {
- tablet_error(task, lng_nil, int_nil, GDKerrbuf,
"SQLload_file");
+ tablet_error(task, rowno, int_nil, GDKerrbuf,
"SQLload_file");
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#bailout on SQLload
%s\n", msg);
#endif
@@ -1301,7 +1343,7 @@ SQLproducer(void *p)
/* the input buffer should be extended, but 'base' is
not shared
between the threads, which we can not now update.
Mimick an ateof instead; */
- tablet_error(task, lng_nil, int_nil, "record too long",
"");
+ tablet_error(task, rowno, int_nil, "record too long",
"");
ateof[cur] = true;
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#bailout on SQLload confronted
with too large record\n");
@@ -1335,111 +1377,67 @@ SQLproducer(void *p)
* user should supply the correct number of fields.
* In the first phase we simply break the lines at the
* record boundary. */
- if (quote == 0) {
- switch (rseplen) {
- case 1:
- for (; *e; e++) {
- if (*e == '\\') {
- if (*++e == 0)
- break;
- continue;
- }
- if (*e == *rsep)
- break;
- }
- break;
- case 2:
- for (; *e; e++) {
- if (*e == '\\') {
- if (*++e == 0)
- break;
- continue;
- }
- if (*e == *rsep && e[1] ==
rsep[1])
- break;
- }
- break;
- default:
- for (; *e; e++) {
- if (*e == '\\') {
- if (*++e == 0)
- break;
- continue;
- }
- if (*e == *rsep && strncmp(e,
rsep, rseplen) == 0)
- break;
- }
- }
- if (*e == 0) {
- partial = e - s;
- e = 0; /* nonterminated record, we
need more */
+ int nutf = 0;
+ int m = 0;
+ bool bs = false;
+ char q = 0;
+ size_t i = 0;
+ while (*e) {
+ /* check for correctly encoded UTF-8 */
+ if (nutf > 0) {
+ if ((*e & 0xC0) != 0x80)
+ goto badutf8;
+ if (m != 0 && (*e & m) == 0)
+ goto badutf8;
+ m = 0;
+ nutf--;
+ } else if ((*e & 0xE0) == 0xC0) {
+ nutf = 1;
+ if ((e[0] & 0x1E) == 0)
+ goto badutf8;
+ } else if ((*e & 0xF0) == 0xE0) {
+ nutf = 2;
+ if ((e[0] & 0x0F) == 0)
+ m = 0x20;
+ } else if ((*e & 0xF8) == 0xF0) {
+ nutf = 3;
+ if ((e[0] & 0x07) == 0)
+ m = 0x30;
+ } else if ((*e & 0x80) != 0) {
+ goto badutf8;
}
- } else {
- char q = 0;
-
- switch (rseplen) {
- case 1:
- for (; *e; e++) {
- if (*e == q)
- q = 0;
- else if (*e == quote)
- q = *e;
- else if (*e == '\\') {
- if (*++e == 0)
- break;
- } else if (!q && *e == *rsep)
- break;
- }
- if (*e == 0) {
- partial = e - s;
- e = 0; /* nonterminated
record, we need more */
- }
- break;
- case 2:
- for (; *e; e++) {
- if (*e == q)
- q = 0;
- else if (*e == quote)
- q = *e;
- else if (*e == '\\') {
- if (e[1])
- e++;
- } else if (!q && e[0] ==
rsep[0] && e[1] == rsep[1])
- break;
- }
- if (*e == 0) {
- partial = e - s;
- e = 0; /* nonterminated
record, we need more */
- }
- break;
- default:
- for (; *e; e++) {
- if (*e == q)
- q = 0;
- else if (*e == quote)
- q = *e;
- else if (*e == '\\') {
- if (*++e == 0)
- break;
- } else if (!q && *e == *rsep &&
strncmp(e, rsep, rseplen) == 0)
- break;
- }
- if (*e == 0) {
- partial = e - s;
- e = 0; /* nonterminated
record, we need more */
- }
+ /* check for quoting and the row separator */
+ if (bs) {
+ bs = false;
+ } else if (*e == '\\') {
+ bs = true;
+ i = 0;
+ } else if (*e == q) {
+ q = 0;
+ } else if (*e == quote) {
+ q = quote;
+ i = 0;
+ } else if (q == 0) {
+ i = rdfa[i][(unsigned char) *e];
+ if (i == rseplen)
+ break;
}
+ e++;
+ }
+ if (*e == 0) {
+ partial = e - s;
+ e = NULL; /* nonterminated
record, we need more */
}
/* check for incomplete line and end of buffer
condition */
if (e) {
+ rowno++;
/* found a complete record, do we need to skip
it? */
if (--task->skip < 0 && cnt < task->maxrow) {
task->lines[cur][task->top[cur]++] = s;
cnt++;
}
- *e = '\0';
- s = e + rseplen;
- e = s;
+ *(e + 1 - rseplen) = 0;
+ s = ++e;
task->b->pos += (size_t) (e - base);
base = e;
if (task->top[cur] == task->limit)
@@ -1448,7 +1446,7 @@ SQLproducer(void *p)
/* found an incomplete record, saved for next
round */
if (s+partial < end) {
/* found a EOS in the input */
- tablet_error(task, lng_nil, int_nil,
"record too long (EOS found)", "");
+ tablet_error(task, rowno, int_nil,
"record too long (EOS found)", "");
ateof[cur] = true;
goto reportlackofinput;
}
@@ -1470,6 +1468,7 @@ SQLproducer(void *p)
/* then wait until it is done */
MT_sema_down(&task->producer);
if (cnt == task->maxrow) {
+ GDKfree(rdfa);
return;
}
} else {
@@ -1483,6 +1482,7 @@ SQLproducer(void *p)
MT_sema_down(&task->producer);
blocked[(cur + 1) % MAXBUFFERS] = false;
if (task->state == ENDOFCOPY) {
+ GDKfree(rdfa);
return;
}
}
@@ -1507,6 +1507,7 @@ SQLproducer(void *p)
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#Producer delivered
all\n");
#endif
+ GDKfree(rdfa);
return;
}
}
@@ -1518,6 +1519,7 @@ SQLproducer(void *p)
#ifdef _DEBUG_TABLET_CNTRL
mnstr_printf(GDKout, "#Producer encountered eof\n");
#endif
+ GDKfree(rdfa);
return;
}
/* consumers ask us to stop? */
@@ -1527,6 +1529,7 @@ SQLproducer(void *p)
mnstr_printf(GDKout, "#SQL producer early exit
%.63s\n",
task->b->buf +
task->b->pos);
#endif
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list
