Changeset: d5c107b03d79 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/d5c107b03d79
Modified Files:
monetdb5/modules/mal/tablet.c
sql/ChangeLog.Jul2021
sql/test/copy/Tests/incorrect_columns.test
Branch: Jul2021
Log Message:
Report line number for errors in log, and row number in sys.rejects.
Making sure all counts are 1 based.
Also change "line" into "row" in the code if that is what was really
meant.
diffs (truncated from 575 to 300 lines):
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -18,7 +18,7 @@
* that should preferable be maintained.
*
* The code below consists of a file reader, which breaks up the
- * file into chunks of distinct lines. Then multiple parallel threads
+ * file into chunks of distinct rows. Then multiple parallel threads
* grab them, and break them on the field boundaries.
* After all fields are identified this way, the columns are converted
* and stored in the BATs.
@@ -572,7 +572,7 @@ TABLEToutput_file(Tablet *as, BAT *order
* that should preferable be maintained.
*
* The code below consists of a file reader, which breaks up the
- * file into chunks of distinct lines. Then multiple parallel threads
+ * file into chunks of distinct rows. Then multiple parallel threads
* grab them, and break them on the field boundaries.
* After all fields are identified this way, the columns are converted
* and stored in the BATs.
@@ -591,7 +591,7 @@ TABLEToutput_file(Tablet *as, BAT *order
/* #define MLOCK_TST did not make a difference on sf10 */
-#define BREAKLINE 1
+#define BREAKROW 1
#define UPDATEBAT 2
#define SYNCBAT 3
#define ENDOFCOPY 4
@@ -599,9 +599,9 @@ TABLEToutput_file(Tablet *as, BAT *order
typedef struct {
Client cntxt;
int id; /* for self reference */
- int state; /* line break=1 , 2 =
update bat */
+ int state; /* row break=1 , 2 =
update bat */
int workers; /* how many concurrent ones */
- int error; /* error during line
break */
+ int error; /* error during row
break */
int next;
int limit;
BUN cnt, maxrow; /* first row in file chunk. */
@@ -624,10 +624,11 @@ typedef struct {
size_t seplen, rseplen;
char quote;
- char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for line
splitter and tokenizer */
+ char *base[MAXBUFFERS], *input[MAXBUFFERS]; /* buffers for row
splitter and tokenizer */
size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer
*/
- char **lines[MAXBUFFERS];
- int top[MAXBUFFERS]; /* number of lines in this buffer */
+ char **rows[MAXBUFFERS];
+ lng *startlineno[MAXBUFFERS];
+ int top[MAXBUFFERS]; /* number of rows in this buffer */
int cur; /* current buffer used by splitter and update threads */
int *cols; /* columns to handle */
@@ -638,11 +639,11 @@ typedef struct {
} READERtask;
static void
-tablet_error(READERtask *task, lng row, int col, const char *msg, const char
*fcn)
+tablet_error(READERtask *task, lng row, lng lineno, int col, const char *msg,
const char *fcn)
{
MT_lock_set(&errorlock);
if (task->cntxt->error_row != NULL) {
- if (BUNappend(task->cntxt->error_row, &row, false) !=
GDK_SUCCEED ||
+ if (BUNappend(task->cntxt->error_row, &(lng){(lng)task->cnt +
row + 1}, false) != GDK_SUCCEED ||
BUNappend(task->cntxt->error_fld, &col, false) !=
GDK_SUCCEED ||
BUNappend(task->cntxt->error_msg, msg, false) !=
GDK_SUCCEED ||
BUNappend(task->cntxt->error_input, fcn, false) !=
GDK_SUCCEED)
@@ -653,11 +654,11 @@ tablet_error(READERtask *task, lng row,
if (task->as->error == NULL) {
if (msg == NULL)
task->besteffort = 0;
- else if (!is_lng_nil(row)) {
+ else if (!is_lng_nil(lineno)) {
if (!is_int_nil(col))
- task->as->error = createException(MAL,
"sql.copy_from", "line " LLFMT ": column %d: %s", row + 1, col + 1, msg);
+ task->as->error = createException(MAL,
"sql.copy_from", "line " LLFMT ": column %d: %s", lineno, col + 1, msg);
else
- task->as->error = createException(MAL,
"sql.copy_from", "line " LLFMT ": %s", row + 1, msg);
+ task->as->error = createException(MAL,
"sql.copy_from", "line " LLFMT ": %s", lineno, msg);
} else
task->as->error = createException(MAL, "sql.copy_from",
"%s", msg);
}
@@ -666,7 +667,7 @@ tablet_error(READERtask *task, lng row,
}
/*
- * The line is broken into pieces directly on their field separators. It
assumes that we have
+ * The row is broken into pieces directly on their field separators. It
assumes that we have
* the record in the cache already, so we can do most work quickly.
* Furthermore, it assume a uniform (SQL) pattern, without whitespace
skipping, but with quote and separator.
*/
@@ -780,7 +781,7 @@ SQLload_error(READERtask *task, lng idx,
s = line = GDKmalloc(sz + task->rseplen + 1);
if (line == 0) {
- tablet_error(task, idx, int_nil, "SQLload malloc error",
"SQLload_error");
+ tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc
error", "SQLload_error");
return 0;
}
for (i = 0; i < attrs; i++) {
@@ -865,7 +866,7 @@ SQLinsert_val(READERtask *task, int col,
MT_lock_set(&errorlock);
snprintf(buf, sizeof(buf),
"line " LLFMT " field %s '%s'
expected%s%s%s",
- row, fmt->name ? fmt->name : "",
fmt->type,
+ task->startlineno[task->cur][idx],
fmt->name ? fmt->name : "", fmt->type,
s ? " in '" : "", s ? s : "", s ? "'"
: "");
GDKfree(s);
if (task->as->error == NULL && (task->as->error =
GDKstrdup(buf)) == NULL)
@@ -927,7 +928,7 @@ SQLworker_column(READERtask *task, int c
MT_lock_set(&mal_copyLock);
if (!fmt[col].skip && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) +
task->next) {
if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit)
!= GDK_SUCCEED) {
- tablet_error(task, lng_nil, col, "Failed to extend the
BAT\n", "SQLworker_column");
+ tablet_error(task, lng_nil, lng_nil, col, "Failed to
extend the BAT\n", "SQLworker_column");
MT_lock_unset(&mal_copyLock);
return -1;
}
@@ -947,64 +948,65 @@ SQLworker_column(READERtask *task, int c
}
/*
- * The lines are broken on the column separator. Any error is shown and
reflected with
+ * The rows are broken on the column separator. Any error is shown and
reflected with
* setting the reference of the offending row fields to NULL.
* This allows the loading to continue, skipping the minimal number of rows.
* The details about the locations can be inspected from the error table.
* We also trim the quotes around strings.
*/
static int
-SQLload_parse_line(READERtask *task, int idx)
+SQLload_parse_row(READERtask *task, int idx)
{
BUN i;
char errmsg[BUFSIZ];
char ch = *task->csep;
- char *line = task->lines[task->cur][idx];
+ char *row = task->rows[task->cur][idx];
+ lng startlineno = task->startlineno[task->cur][idx];
Tablet *as = task->as;
Column *fmt = as->format;
bool error = false;
str errline = 0;
assert(idx < task->top[task->cur]);
- assert(line);
+ assert(row);
errmsg[0] = 0;
if (task->quote || task->seplen != 1) {
for (i = 0; i < as->nr_attrs; i++) {
bool quote = false;
- task->fields[i][idx] = line;
+ task->fields[i][idx] = row;
/* recognize fields starting with a quote, keep them */
- if (*line && *line == task->quote) {
+ if (*row && *row == task->quote) {
quote = true;
- task->fields[i][idx] = line + 1;
- line = tablet_skip_string(line + 1,
task->quote, task->escape);
+ task->fields[i][idx] = row + 1;
+ row = tablet_skip_string(row + 1, task->quote,
task->escape);
- if (!line) {
+ if (!row) {
errline = SQLload_error(task, idx, i+1);
snprintf(errmsg, BUFSIZ, "Quote (%c)
missing", task->quote);
- tablet_error(task, idx, (int) i,
errmsg, errline);
+ tablet_error(task, idx, startlineno,
(int) i + 1, errmsg, errline);
GDKfree(errline);
error = true;
goto errors1;
} else
- *line++ = 0;
+ *row++ = 0;
}
/* eat away the column separator */
- for (; *line; line++)
- if (*line == '\\') {
- if (line[1])
- line++;
- } else if (*line == ch && (task->seplen == 1 ||
strncmp(line, task->csep, task->seplen) == 0)) {
- *line = 0;
- line += task->seplen;
+ for (; *row; row++)
+ if (*row == '\\') {
+ if (row[1])
+ row++;
+ } else if (*row == ch && (task->seplen == 1 ||
strncmp(row, task->csep, task->seplen) == 0)) {
+ *row = 0;
+ row += task->seplen;
goto endoffieldcheck;
}
/* not enough fields */
if (i < as->nr_attrs - 1) {
errline = SQLload_error(task, idx, i+1);
- tablet_error(task, idx, (int) i, "Column value
missing", errline);
+ tablet_error(task, idx, startlineno, (int) i +
1, "Column value missing", errline);
GDKfree(errline);
error = true;
errors1:
@@ -1023,23 +1025,23 @@ SQLload_parse_line(READERtask *task, int
assert(!task->quote);
assert(task->seplen == 1);
for (i = 0; i < as->nr_attrs; i++) {
- task->fields[i][idx] = line;
+ task->fields[i][idx] = row;
/* eat away the column separator */
- for (; *line; line++)
- if (*line == '\\') {
- if (line[1])
- line++;
- } else if (*line == ch) {
- *line = 0;
- line++;
+ for (; *row; row++)
+ if (*row == '\\') {
+ if (row[1])
+ row++;
+ } else if (*row == ch) {
+ *row = 0;
+ row++;
goto endoffield2;
}
/* not enough fields */
if (i < as->nr_attrs - 1) {
errline = SQLload_error(task, idx,i+1);
- tablet_error(task, idx, (int) i, "Column value
missing", errline);
+ tablet_error(task, idx, startlineno, (int) i +
1, "Column value missing", errline);
GDKfree(errline);
error = true;
/* we save all errors detected */
@@ -1056,10 +1058,10 @@ SQLload_parse_line(READERtask *task, int
}
}
/* check for too many values as well*/
- if (line && *line && i == as->nr_attrs) {
+ if (row && *row && i == as->nr_attrs) {
errline = SQLload_error(task, idx, task->as->nr_attrs);
- snprintf(errmsg, BUFSIZ, "Leftover data '%s'",line);
- tablet_error(task, idx, (int) i, errmsg, errline);
+ snprintf(errmsg, BUFSIZ, "Leftover data '%s'",row);
+ tablet_error(task, idx, startlineno, (int) i + 1, errmsg,
errline);
GDKfree(errline);
error = true;
}
@@ -1081,15 +1083,15 @@ SQLworker(void *arg)
while (task->top[task->cur] >= 0) {
MT_sema_down(&task->sema);
- /* stage one, break the lines spread the worker over the
workers */
+ /* stage one, break the rows spread the work over the workers */
switch (task->state) {
- case BREAKLINE:
+ case BREAKROW:
t0 = GDKusec();
piece = (task->top[task->cur] + task->workers) /
task->workers;
for (j = piece * task->id; j < task->top[task->cur] &&
j < piece * (task->id +1); j++)
- if (task->lines[task->cur][j]) {
- if (SQLload_parse_line(task, j) < 0) {
+ if (task->rows[task->cur][j]) {
+ if (SQLload_parse_row(task, j) < 0) {
task->errorcnt++;
// early break unless best
effort
if (!task->besteffort) {
@@ -1184,7 +1186,7 @@ SQLworkdivider(READERtask *task, READERt
/*
* Reading is handled by a separate task as a preparation for more parallelism.
- * A buffer is filled with proper lines.
+ * A buffer is filled with proper rows.
* If we are reading from a file then a double buffering scheme ia activated.
* Reading from the console (stdin) remains single buffered only.
* If we end up with unfinished records, then the rowlimit will terminate the
process.
@@ -1248,6 +1250,8 @@ SQLproducer(void *p)
char quote = task->quote;
dfa_t rdfa;
lng rowno = 0;
+ lng lineno = 1;
+ lng startlineno = 1;
int more = 0;
MT_sema_down(&task->producer);
@@ -1257,7 +1261,7 @@ SQLproducer(void *p)
rdfa = mkdfa((const unsigned char *) rsep, rseplen);
if (rdfa == NULL) {
- tablet_error(task, lng_nil, int_nil, "cannot allocate memory",
"");
+ tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate
memory", "");
ateof[cur] = true;
goto reportlackofinput;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list