Changeset: a9b241e9a4c4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a9b241e9a4c4
Added Files:
sql/scripts/27_rejects.sql
Modified Files:
monetdb5/mal/mal_client.c
monetdb5/mal/mal_client.h
monetdb5/modules/mal/tablet.c
monetdb5/modules/mal/tablet.h
sql/backends/monet5/sql.c
sql/backends/monet5/sql.mal
sql/backends/monet5/sql_result.c
sql/backends/monet5/sql_result.h
sql/scripts/Makefile.ag
Branch: resultset
Log Message:
Reject code improvements
- reject tables made part of the client context for less interference.
- add best effort parameter to calls
- clean up return code structure in tablet.c
- add reject row vector for vacuum
- add script to make reject functionality visible at SQL layer.
diffs (truncated from 729 to 300 lines):
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -253,6 +253,7 @@ MCinitClientRecord(Client c, oid user, b
c->totaltime = 0;
/* create a recycler cache */
c->exception_buf_initialized = 0;
+ c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
MT_sema_init(&c->s, 0, "Client->s");
return c;
}
@@ -387,6 +388,13 @@ freeClient(Client c)
c->mode = MCshutdowninprogress()? BLOCKCLIENT: FREECLIENT;
GDKfree(c->glb);
c->glb = NULL;
+ if( c->error_row){
+ BBPdecref(c->error_row->batCacheid,TRUE);
+ BBPdecref(c->error_fld->batCacheid,TRUE);
+ BBPdecref(c->error_msg->batCacheid,TRUE);
+ BBPdecref(c->error_input->batCacheid,TRUE);
+ c->error_row = c->error_fld = c->error_msg = c->error_input =
NULL;
+ }
if (t)
THRdel(t); /* you may perform suicide */
MT_sema_destroy(&c->s);
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -169,6 +169,13 @@ typedef struct CLIENT {
* contexts at the same time are in use.
*/
void *sqlcontext;
+ /*
+ * Errors during copy into are collected in a user specific column
set
+ */
+ BAT *error_row;
+ BAT *error_fld;
+ BAT *error_msg;
+ BAT *error_input;
} *Client, ClientRec;
mal_export void MCinit(void);
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -55,29 +55,8 @@
/* #define SQLLOADTHREAD */ /* define to get separate reader thread
*/
-/* Keep a list of rejected records for manual correction */
-static BAT *error_row;
-static BAT *error_fld;
-static BAT *error_msg;
-static BAT *error_input;
-
static MT_Lock errorlock MT_LOCK_INITIALIZER("errorlock");
-static void
-tablet_error(BUN row, int col, Tablet *as, str msg, str fcn)
-{
- if( error_row){
- MT_lock_set(&errorlock, "tablet_error");
- BUNappend(error_row, &row, FALSE);
- BUNappend(error_fld, &col, FALSE);
- BUNappend(error_msg, msg, FALSE);
- BUNappend(error_input, fcn, FALSE);
- if (as->error == NULL && (msg == NULL || (as->error =
GDKstrdup(msg)) == NULL))
- as->error = M5OutOfMemory;
- MT_lock_unset(&errorlock, "tablet_error");
- }
-}
-
static BAT *
void_bat_create(int adt, BUN nr)
{
@@ -223,36 +202,30 @@ check_BATs(Tablet *as)
return base;
}
-int
+str
TABLETcreate_bats(Tablet *as, BUN est)
{
Column *fmt = as->format;
- BUN i;
- char errbuf[42];
+ BUN i,j;
for (i = 0; i < as->nr_attrs; i++) {
fmt[i].c = void_bat_create(fmt[i].adt, est);
fmt[i].ci = bat_iterator(fmt[i].c);
if (!fmt[i].c) {
- snprintf(errbuf, sizeof(errbuf), "Failed to create bat
of size " BUNFMT "\n", as->nr);
- tablet_error(oid_nil, i, as, errbuf,
"TABLETcreate_bats");
- return -1;
+ for(j=0; j<i; j++)
+ BBPdecref(fmt[j].c->batCacheid, FALSE);
+ throw(SQL,"copy","Failed to create bat of size " BUNFMT
"\n", as->nr);
}
}
- return 0;
+ return MAL_SUCCEED;
}
-BAT **
-TABLETcollect(Tablet *as)
+str
+TABLETcollect(BAT **bats, Tablet *as)
{
- BAT **bats = GDKmalloc(sizeof(BAT *) * as->nr_attrs);
Column *fmt = as->format;
BUN i;
BUN cnt = BATcount(fmt[0].c);
- char errbuf[42];
-
- if (bats == NULL)
- return NULL;
for (i = 0; i < as->nr_attrs; i++) {
bats[i] = fmt[i].c;
@@ -260,27 +233,19 @@ TABLETcollect(Tablet *as)
BATsetaccess(fmt[i].c, BAT_READ);
BATderiveProps(fmt[i].c, 1);
- if (cnt != BATcount(fmt[i].c)) {
- snprintf(errbuf, sizeof(errbuf), "Error: count " BUNFMT
" differs from " BUNFMT "\n", BATcount(fmt[i].c), cnt);
- tablet_error(oid_nil, i,as,errbuf,"TABLETcollect");
- GDKfree(bats);
- return NULL;
- }
+ if (cnt != BATcount(fmt[i].c))
+ throw(SQL,"copy", "Count " BUNFMT " differs from "
BUNFMT "\n", BATcount(fmt[i].c), cnt);
}
- return bats;
+ return MAL_SUCCEED;
}
-BAT **
-TABLETcollect_parts(Tablet *as, BUN offset)
+str
+TABLETcollect_parts(BAT **bats, Tablet *as, BUN offset)
{
- BAT **bats = GDKmalloc(sizeof(BAT *) * as->nr_attrs);
Column *fmt = as->format;
BUN i;
BUN cnt = BATcount(fmt[0].c);
- char errbuf[42];
- if (bats == NULL)
- return NULL;
for (i = 0; i < as->nr_attrs; i++) {
BAT *b = fmt[i].c;
BAT *bv = NULL;
@@ -305,14 +270,10 @@ TABLETcollect_parts(Tablet *as, BUN offs
BBPunfix(bv->batCacheid);
bats[i] = BATslice(b, offset, BATcount(b));
}
- if (cnt != BATcount(b)) {
- snprintf(errbuf, sizeof(errbuf), "Error: count " BUNFMT
" differs from " BUNFMT "\n", BATcount(b), cnt);
- tablet_error(oid_nil, i, as,errbuf,
"TABLETcollect_parts");
- GDKfree(bats);
- return NULL;
- }
+ if (cnt != BATcount(b))
+ throw(SQL,"copy", "Count " BUNFMT " differs from "
BUNFMT "\n", BATcount(b), cnt);
}
- return bats;
+ return MAL_SUCCEED;
}
static char *
@@ -694,6 +655,7 @@ TABLEToutput_file(Tablet *as, BAT *order
#define UPDATEBAT 2
typedef struct {
+ Client cntxt;
int id; /* for self reference */
int state; /* line break=1 , 2 =
update bat */
int workers; /* how many concurrent ones */
@@ -721,8 +683,30 @@ typedef struct {
size_t basesize;
int *cols; /* columns to handle */
char ***fields;
+ int besteffort;
+ bte *rowerror;
+ int errorcnt;
} READERtask;
+static void
+tablet_error(READERtask *task, lng row, int col, str msg, str fcn)
+{
+ if( task->cntxt->error_row == NULL){
+ MT_lock_set(&errorlock, "tablet_error");
+ BUNappend(task->cntxt->error_row, &row, FALSE);
+ BUNappend(task->cntxt->error_fld, &col, FALSE);
+ BUNappend(task->cntxt->error_msg, msg, FALSE);
+ BUNappend(task->cntxt->error_input, fcn, FALSE);
+ if (task->as->error == NULL && (msg == NULL || (task->as->error
= GDKstrdup(msg)) == NULL))
+ task->as->error = M5OutOfMemory;
+ if ( row != lng_nil){
+ task->rowerror[row]++;
+ task->errorcnt++;
+ }
+ MT_lock_unset(&errorlock, "tablet_error");
+ }
+}
+
/*
* The line is broken into pieces directly on their field separators. It
assumes that we have
* the record in the cache already, so we can do most work quickly.
@@ -730,7 +714,7 @@ typedef struct {
*/
static str
-SQLload_error(READERtask *task, int idx)
+SQLload_error(READERtask *task, lng idx)
{
str line;
size_t sz = 0;
@@ -744,7 +728,7 @@ SQLload_error(READERtask *task, int idx)
line = (str) GDKmalloc(sz + task->rseplen + 1);
if (line == 0) {
- tablet_error((BUN) idx, int_nil,task->as, "SQLload malloc
error","SQLload_error");
+ tablet_error(task, idx, int_nil, "SQLload malloc
error","SQLload_error");
return 0;
}
line[0] = 0;
@@ -765,9 +749,8 @@ SQLload_error(READERtask *task, int idx)
* and interpret the body.
*/
static inline void
-SQLinsert_val(READERtask *task, int col, int idx)
+SQLinsert_val(READERtask *task, int col, lng idx)
{
- Tablet *as = task->as;
Column *fmt = task->as->format+col;
const void *adt;
char buf[BUFSIZ];
@@ -805,13 +788,14 @@ SQLinsert_val(READERtask *task, int col,
BUN row = BATcount(fmt->c);
snprintf(buf, BUFSIZ, "'%s' expected ", fmt->type);
err = SQLload_error(task,idx);
- //tablet_error(BATcount(fmt->c), col, as, buf, err);
- if( error_row){
+ if( task->rowerror){
MT_lock_set(&errorlock, "insert_val");
- BUNappend(error_row, &row, FALSE);
- BUNappend(error_fld, &col, FALSE);
- BUNappend(error_msg, buf, FALSE);
- BUNappend(error_input, err, FALSE);
+ BUNappend(task->cntxt->error_row, &row, FALSE);
+ BUNappend(task->cntxt->error_fld, &col, FALSE);
+ BUNappend(task->cntxt->error_msg, buf, FALSE);
+ BUNappend(task->cntxt->error_input, err, FALSE);
+ task->rowerror[(int)row]++;
+ task->errorcnt++;
MT_lock_unset(&errorlock, "insert_val");
}
GDKfree(err);
@@ -823,29 +807,42 @@ SQLinsert_val(READERtask *task, int col,
bunfastins(fmt->c, key, adt);
return ;
bunins_failed:
- tablet_error(BATcount(fmt->c), col, as, "insert failed", "");
+ if( task->rowerror){
+ BUN row = BATcount(fmt->c);
+ MT_lock_set(&errorlock, "insert_val");
+ BUNappend(task->cntxt->error_row, &row, FALSE);
+ BUNappend(task->cntxt->error_fld, &col, FALSE);
+ BUNappend(task->cntxt->error_msg, "insert failed", FALSE);
+ BUNappend(task->cntxt->error_input, err, FALSE);
+ task->rowerror[(int)row]++;
+ task->errorcnt++;
+ MT_lock_unset(&errorlock, "insert_val");
+ }
}
static int
SQLworker_column(READERtask *task, int col)
{
- int i;
+ lng i;
Column *fmt = task->as->format;
/* watch out for concurrent threads */
MT_lock_set(&mal_copyLock, "tablet insert value");
if (BATcapacity(fmt[col].c) < BATcount(fmt[col].c) + task->next) {
if ((fmt[col].c = BATextend(fmt[col].c, BATgrows(fmt[col].c) +
task->limit)) == NULL) {
- tablet_error(oid_nil,col,task->as,"Failed to extend the
BAT, perhaps disk full\n","SQLworker_column");
+ tablet_error(task, lng_nil, col, "Failed to extend the
BAT, perhaps disk full\n","SQLworker_column");
MT_lock_unset(&mal_copyLock, "tablet insert value");
return -1;
}
}
MT_lock_unset(&mal_copyLock, "tablet insert value");
- for (i = 0; i < task->next; i++)
+ for (i = 0; i < task->next; i++){
if (task->fields[col][i])
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list