Changeset: d5c107b03d79 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/d5c107b03d79
Modified Files:
        monetdb5/modules/mal/tablet.c
        sql/ChangeLog.Jul2021
        sql/test/copy/Tests/incorrect_columns.test
Branch: Jul2021
Log Message:

Report line number for errors in log, and row number in sys.rejects.
Making sure all counts are 1 based.
Also change "line" into "row" in the code if that is what was really
meant.


diffs (truncated from 575 to 300 lines):

diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -18,7 +18,7 @@
  * that should preferable be maintained.
  *
  * The code below consists of a file reader, which breaks up the
- * file into chunks of distinct lines. Then multiple parallel threads
+ * file into chunks of distinct rows. Then multiple parallel threads
  * grab them, and break them on the field boundaries.
  * After all fields are identified this way, the columns are converted
  * and stored in the BATs.
@@ -572,7 +572,7 @@ TABLEToutput_file(Tablet *as, BAT *order
  * that should preferable be maintained.
  *
  * The code below consists of a file reader, which breaks up the
- * file into chunks of distinct lines. Then multiple parallel threads
+ * file into chunks of distinct rows. Then multiple parallel threads
  * grab them, and break them on the field boundaries.
  * After all fields are identified this way, the columns are converted
  * and stored in the BATs.
@@ -591,7 +591,7 @@ TABLEToutput_file(Tablet *as, BAT *order
 
 /* #define MLOCK_TST did not make a difference on sf10 */
 
-#define BREAKLINE 1
+#define BREAKROW 1
 #define UPDATEBAT 2
 #define SYNCBAT 3
 #define ENDOFCOPY 4
@@ -599,9 +599,9 @@ TABLEToutput_file(Tablet *as, BAT *order
 typedef struct {
        Client cntxt;
        int id;                                         /* for self reference */
-       int state;                                      /* line break=1 , 2 = 
update bat */
+       int state;                                      /* row break=1 , 2 = 
update bat */
        int workers;                            /* how many concurrent ones */
-       int error;                                      /* error during line 
break */
+       int error;                                      /* error during row 
break */
        int next;
        int limit;
        BUN cnt, maxrow;                        /* first row in file chunk. */
@@ -624,10 +624,11 @@ typedef struct {
        size_t seplen, rseplen;
        char quote;
 
-       char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for line 
splitter and tokenizer */
+       char *base[MAXBUFFERS], *input[MAXBUFFERS];     /* buffers for row 
splitter and tokenizer */
        size_t rowlimit[MAXBUFFERS]; /* determines maximal record length buffer 
*/
-       char **lines[MAXBUFFERS];
-       int top[MAXBUFFERS];            /* number of lines in this buffer */
+       char **rows[MAXBUFFERS];
+       lng *startlineno[MAXBUFFERS];
+       int top[MAXBUFFERS];            /* number of rows in this buffer */
        int cur;  /* current buffer used by splitter and update threads */
 
        int *cols;                                      /* columns to handle */
@@ -638,11 +639,11 @@ typedef struct {
 } READERtask;
 
 static void
-tablet_error(READERtask *task, lng row, int col, const char *msg, const char 
*fcn)
+tablet_error(READERtask *task, lng row, lng lineno, int col, const char *msg, 
const char *fcn)
 {
        MT_lock_set(&errorlock);
        if (task->cntxt->error_row != NULL) {
-               if (BUNappend(task->cntxt->error_row, &row, false) != 
GDK_SUCCEED ||
+               if (BUNappend(task->cntxt->error_row, &(lng){(lng)task->cnt + 
row + 1}, false) != GDK_SUCCEED ||
                        BUNappend(task->cntxt->error_fld, &col, false) != 
GDK_SUCCEED ||
                        BUNappend(task->cntxt->error_msg, msg, false) != 
GDK_SUCCEED ||
                        BUNappend(task->cntxt->error_input, fcn, false) != 
GDK_SUCCEED)
@@ -653,11 +654,11 @@ tablet_error(READERtask *task, lng row, 
        if (task->as->error == NULL) {
                if (msg == NULL)
                        task->besteffort = 0;
-               else if (!is_lng_nil(row)) {
+               else if (!is_lng_nil(lineno)) {
                        if (!is_int_nil(col))
-                               task->as->error = createException(MAL, 
"sql.copy_from", "line " LLFMT ": column %d: %s", row + 1, col + 1, msg);
+                               task->as->error = createException(MAL, 
"sql.copy_from", "line " LLFMT ": column %d: %s", lineno, col + 1, msg);
                        else
-                               task->as->error = createException(MAL, 
"sql.copy_from", "line " LLFMT ": %s", row + 1, msg);
+                               task->as->error = createException(MAL, 
"sql.copy_from", "line " LLFMT ": %s", lineno, msg);
                } else
                        task->as->error = createException(MAL, "sql.copy_from", 
"%s", msg);
        }
@@ -666,7 +667,7 @@ tablet_error(READERtask *task, lng row, 
 }
 
 /*
- * The line is broken into pieces directly on their field separators. It 
assumes that we have
+ * The row is broken into pieces directly on their field separators. It 
assumes that we have
  * the record in the cache already, so we can do most work quickly.
  * Furthermore, it assume a uniform (SQL) pattern, without whitespace 
skipping, but with quote and separator.
  */
@@ -780,7 +781,7 @@ SQLload_error(READERtask *task, lng idx,
 
        s = line = GDKmalloc(sz + task->rseplen + 1);
        if (line == 0) {
-               tablet_error(task, idx, int_nil, "SQLload malloc error", 
"SQLload_error");
+               tablet_error(task, idx, lng_nil, int_nil, "SQLload malloc 
error", "SQLload_error");
                return 0;
        }
        for (i = 0; i < attrs; i++) {
@@ -865,7 +866,7 @@ SQLinsert_val(READERtask *task, int col,
                        MT_lock_set(&errorlock);
                        snprintf(buf, sizeof(buf),
                                         "line " LLFMT " field %s '%s' 
expected%s%s%s",
-                                        row, fmt->name ? fmt->name : "", 
fmt->type,
+                                        task->startlineno[task->cur][idx], 
fmt->name ? fmt->name : "", fmt->type,
                                         s ? " in '" : "", s ? s : "", s ? "'" 
: "");
                        GDKfree(s);
                        if (task->as->error == NULL && (task->as->error = 
GDKstrdup(buf)) == NULL)
@@ -927,7 +928,7 @@ SQLworker_column(READERtask *task, int c
        MT_lock_set(&mal_copyLock);
        if (!fmt[col].skip && BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + 
task->next) {
                if (BATextend(fmt[col].c, BATgrows(fmt[col].c) + task->limit) 
!= GDK_SUCCEED) {
-                       tablet_error(task, lng_nil, col, "Failed to extend the 
BAT\n", "SQLworker_column");
+                       tablet_error(task, lng_nil, lng_nil, col, "Failed to 
extend the BAT\n", "SQLworker_column");
                        MT_lock_unset(&mal_copyLock);
                        return -1;
                }
@@ -947,64 +948,65 @@ SQLworker_column(READERtask *task, int c
 }
 
 /*
- * The lines are broken on the column separator. Any error is shown and 
reflected with
+ * The rows are broken on the column separator. Any error is shown and 
reflected with
  * setting the reference of the offending row fields to NULL.
  * This allows the loading to continue, skipping the minimal number of rows.
  * The details about the locations can be inspected from the error table.
  * We also trim the quotes around strings.
  */
 static int
-SQLload_parse_line(READERtask *task, int idx)
+SQLload_parse_row(READERtask *task, int idx)
 {
        BUN i;
        char errmsg[BUFSIZ];
        char ch = *task->csep;
-       char *line = task->lines[task->cur][idx];
+       char *row = task->rows[task->cur][idx];
+       lng startlineno = task->startlineno[task->cur][idx];
        Tablet *as = task->as;
        Column *fmt = as->format;
        bool error = false;
        str errline = 0;
 
        assert(idx < task->top[task->cur]);
-       assert(line);
+       assert(row);
        errmsg[0] = 0;
 
        if (task->quote || task->seplen != 1) {
                for (i = 0; i < as->nr_attrs; i++) {
                        bool quote = false;
-                       task->fields[i][idx] = line;
+                       task->fields[i][idx] = row;
                        /* recognize fields starting with a quote, keep them */
-                       if (*line && *line == task->quote) {
+                       if (*row && *row == task->quote) {
                                quote = true;
-                               task->fields[i][idx] = line + 1;
-                               line = tablet_skip_string(line + 1, 
task->quote, task->escape);
+                               task->fields[i][idx] = row + 1;
+                               row = tablet_skip_string(row + 1, task->quote, 
task->escape);
 
-                               if (!line) {
+                               if (!row) {
                                        errline = SQLload_error(task, idx, i+1);
                                        snprintf(errmsg, BUFSIZ, "Quote (%c) 
missing", task->quote);
-                                       tablet_error(task, idx, (int) i, 
errmsg, errline);
+                                       tablet_error(task, idx, startlineno, 
(int) i + 1, errmsg, errline);
                                        GDKfree(errline);
                                        error = true;
                                        goto errors1;
                                } else
-                                       *line++ = 0;
+                                       *row++ = 0;
                        }
 
                        /* eat away the column separator */
-                       for (; *line; line++)
-                               if (*line == '\\') {
-                                       if (line[1])
-                                               line++;
-                               } else if (*line == ch && (task->seplen == 1 || 
strncmp(line, task->csep, task->seplen) == 0)) {
-                                       *line = 0;
-                                       line += task->seplen;
+                       for (; *row; row++)
+                               if (*row == '\\') {
+                                       if (row[1])
+                                               row++;
+                               } else if (*row == ch && (task->seplen == 1 || 
strncmp(row, task->csep, task->seplen) == 0)) {
+                                       *row = 0;
+                                       row += task->seplen;
                                        goto endoffieldcheck;
                                }
 
                        /* not enough fields */
                        if (i < as->nr_attrs - 1) {
                                errline = SQLload_error(task, idx, i+1);
-                               tablet_error(task, idx, (int) i, "Column value 
missing", errline);
+                               tablet_error(task, idx, startlineno, (int) i + 
1, "Column value missing", errline);
                                GDKfree(errline);
                                error = true;
                          errors1:
@@ -1023,23 +1025,23 @@ SQLload_parse_line(READERtask *task, int
                assert(!task->quote);
                assert(task->seplen == 1);
                for (i = 0; i < as->nr_attrs; i++) {
-                       task->fields[i][idx] = line;
+                       task->fields[i][idx] = row;
 
                        /* eat away the column separator */
-                       for (; *line; line++)
-                               if (*line == '\\') {
-                                       if (line[1])
-                                               line++;
-                               } else if (*line == ch) {
-                                       *line = 0;
-                                       line++;
+                       for (; *row; row++)
+                               if (*row == '\\') {
+                                       if (row[1])
+                                               row++;
+                               } else if (*row == ch) {
+                                       *row = 0;
+                                       row++;
                                        goto endoffield2;
                                }
 
                        /* not enough fields */
                        if (i < as->nr_attrs - 1) {
                                errline = SQLload_error(task, idx,i+1);
-                               tablet_error(task, idx, (int) i, "Column value 
missing", errline);
+                               tablet_error(task, idx, startlineno, (int) i + 
1, "Column value missing", errline);
                                GDKfree(errline);
                                error = true;
                                /* we save all errors detected */
@@ -1056,10 +1058,10 @@ SQLload_parse_line(READERtask *task, int
                }
        }
        /* check for too many values as well*/
-       if (line && *line && i == as->nr_attrs) {
+       if (row && *row && i == as->nr_attrs) {
                errline = SQLload_error(task, idx, task->as->nr_attrs);
-               snprintf(errmsg, BUFSIZ, "Leftover data '%s'",line);
-               tablet_error(task, idx, (int) i, errmsg, errline);
+               snprintf(errmsg, BUFSIZ, "Leftover data '%s'",row);
+               tablet_error(task, idx, startlineno, (int) i + 1, errmsg, 
errline);
                GDKfree(errline);
                error = true;
        }
@@ -1081,15 +1083,15 @@ SQLworker(void *arg)
        while (task->top[task->cur] >= 0) {
                MT_sema_down(&task->sema);
 
-               /* stage one, break the lines spread the worker over the 
workers */
+               /* stage one, break the rows spread the work over the workers */
                switch (task->state) {
-               case BREAKLINE:
+               case BREAKROW:
                        t0 = GDKusec();
                        piece = (task->top[task->cur] + task->workers) / 
task->workers;
 
                        for (j = piece * task->id; j < task->top[task->cur] && 
j < piece * (task->id +1); j++)
-                               if (task->lines[task->cur][j]) {
-                                       if (SQLload_parse_line(task, j) < 0) {
+                               if (task->rows[task->cur][j]) {
+                                       if (SQLload_parse_row(task, j) < 0) {
                                                task->errorcnt++;
                                                // early break unless best 
effort
                                                if (!task->besteffort) {
@@ -1184,7 +1186,7 @@ SQLworkdivider(READERtask *task, READERt
 
 /*
  * Reading is handled by a separate task as a preparation for more parallelism.
- * A buffer is filled with proper lines.
+ * A buffer is filled with proper rows.
  * If we are reading from a file then a double buffering scheme ia activated.
  * Reading from the console (stdin) remains single buffered only.
  * If we end up with unfinished records, then the rowlimit will terminate the 
process.
@@ -1248,6 +1250,8 @@ SQLproducer(void *p)
        char quote = task->quote;
        dfa_t rdfa;
        lng rowno = 0;
+       lng lineno = 1;
+       lng startlineno = 1;
        int more = 0;
 
        MT_sema_down(&task->producer);
@@ -1257,7 +1261,7 @@ SQLproducer(void *p)
 
        rdfa = mkdfa((const unsigned char *) rsep, rseplen);
        if (rdfa == NULL) {
-               tablet_error(task, lng_nil, int_nil, "cannot allocate memory", 
"");
+               tablet_error(task, lng_nil, lng_nil, int_nil, "cannot allocate 
memory", "");
                ateof[cur] = true;
                goto reportlackofinput;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to