Changeset: aa611626e4d0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/aa611626e4d0
Modified Files:
        sql/backends/monet5/sql_copyinto.c
Branch: directappend
Log Message:

Get rid of the LoadOps callback struct


diffs (truncated from 737 to 300 lines):

diff --git a/sql/backends/monet5/sql_copyinto.c 
b/sql/backends/monet5/sql_copyinto.c
--- a/sql/backends/monet5/sql_copyinto.c
+++ b/sql/backends/monet5/sql_copyinto.c
@@ -94,19 +94,256 @@ typedef struct Table_t {
        BAT *complaints;                        /* lines that did not match the 
required input */
 } Tablet;
 
-// Callback interface to append the data directly instead of storing it in 
intermediate BATs.
-// SQLload_file doesn't know how to manipulate the sql transaction 
bookkeeping, caller does.
-typedef str (*loadfile_claim_fptr)(void *state, size_t nrows, size_t ncols, 
Column *cols[]);
-typedef str (*loadfile_append_one_fptr)(void *state, size_t idx, const void 
*data, void *col);
-typedef str (*loadfile_append_batch_fptr)(void *state, const void *data, BUN 
count, int width, void *col);
-typedef BAT *(*loadfile_get_offsets_bat_fptr)(void *state);
-typedef struct LoadOps {
-       void *state;
-       loadfile_claim_fptr claim;
-       loadfile_append_one_fptr append_one;
-       loadfile_append_batch_fptr append_batch;
-       loadfile_get_offsets_bat_fptr get_offsets;
-} LoadOps;
+
+struct directappend {
+       mvc *mvc;
+       sql_table *t;
+       BAT *all_offsets; // all offsets ever generated
+       BAT *new_offsets; // as most recently returned by mvc_claim_slots.
+       BUN offset;           // as most recently returned by mvc_claim_slots.
+};
+
+static void
+directappend_destroy(struct directappend *state)
+{
+       if (state == NULL)
+               return;
+       struct directappend *st = state;
+       if (st->all_offsets)
+               BBPreclaim(st->all_offsets);
+       if (st->new_offsets)
+               BBPreclaim(st->new_offsets);
+}
+
+static str
+directappend_init(struct directappend *state, Client cntxt, sql_table *t)
+{
+       str msg = MAL_SUCCEED;
+       *state = (struct directappend) { 0 };
+       backend *be;
+       mvc *mvc;
+
+       msg = checkSQLContext(cntxt);
+       if (msg != MAL_SUCCEED)
+               goto bailout;
+       be = cntxt->sqlcontext;
+       mvc = be->mvc;
+       state->mvc = mvc;
+       state->t = t;
+
+       state->all_offsets = COLnew(0, TYPE_oid, 0, TRANSIENT);
+       if (state->all_offsets == NULL) {
+               msg = createException(SQL, "sql.append_from", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               goto bailout;
+       }
+
+       assert(msg == MAL_SUCCEED);
+       return msg;
+
+bailout:
+       assert(msg != MAL_SUCCEED);
+       directappend_destroy(state);
+       return msg;
+}
+
+static str
+directappend_claim(void *state_, size_t nrows, size_t ncols, Column *cols[])
+{
+       str msg = MAL_SUCCEED;
+
+       // these parameters aren't used right now, useful if we ever also move 
the
+       // old bunfastapp-on-temporary-bats scheme to the callback interface
+       // too, making SQLload_file fully mechanism agnostic.
+       // Then again, maybe just drop them instead.
+       (void)ncols;
+       (void)cols;
+
+       assert(state_ != NULL);
+       struct directappend *state = state_;
+
+       if (state->new_offsets != NULL) {
+               // Leftover from previous round, the logic below counts on it 
not being present.
+               // We can change that but have to do so carefully. for now just 
drop it.
+               BBPreclaim(state->new_offsets);
+               state->new_offsets = NULL;
+       }
+
+       // Allocate room for this batch
+       BUN dummy_offset = 424242424242;
+       state->offset = dummy_offset;
+       sql_trans *tr = state->mvc->session->tr;
+       sqlstore *store = tr->store;
+       int ret = store->storage_api.claim_tab(tr, state->t, nrows, 
&state->offset, &state->new_offsets);
+       // int ret = mvc_claim_slots(state->mvc->session->tr, state->t, nrows, 
&state->offset, &state->new_offsets);
+       if (ret != LOG_OK) {
+               msg = createException(SQL, "sql.append_from", SQLSTATE(3F000) 
"Could not claim slots");
+               goto bailout;
+       }
+
+       // Append the batch to all_offsets.
+       if (state->new_offsets != NULL) {
+               if (BATappend(state->all_offsets, state->new_offsets, NULL, 
false) != GDK_SUCCEED) {
+                       msg = createException(SQL, "sql.append_from", 
SQLSTATE(3F000) "BATappend failed");
+                       goto bailout;
+               }
+       } else {
+               // Help, there must be a BATfunction for this.
+               // Also, maybe we should try to make state->all_offsets a void 
BAT and only
+               // switch to materialized oid's if necessary.
+               BUN oldcount = BATcount(state->all_offsets);
+               BUN newcount = oldcount + nrows;
+               if (BATcapacity(state->all_offsets) < newcount) {
+                       if (BATextend(state->all_offsets, newcount) != 
GDK_SUCCEED) {
+                               msg = createException(SQL, "sql.append_from", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
+                               goto bailout;
+                       }
+               }
+               oid * oo = Tloc(state->all_offsets, oldcount);
+               for (BUN i = 0; i < nrows; i++)
+                       *oo++ = state->offset + i;
+               BATsetcount(state->all_offsets, newcount);
+       }
+
+       // The protocol for mvc_claim is that it returns either a consecutive 
block,
+       // by setting *offset, or a BAT of positions by setting *offsets. 
However,
+       // it is possible and even likely that only the first few items of the 
BAT
+       // are actually scattered positions, while the rest is still a 
consecutive
+       // block at the end. Appending at the end is much cheaper so we peel the
+       // consecutive elements off the back of the BAT and treat them 
separately.
+       //
+       // In the remainder of the function, 'state->newoffsets' holds 
'front_count'
+       // positions if it exists, while another 'back_count' positions start at
+       // 'back_offset'.
+       //
+       // TODO this code has become a little convoluted as it evolved.
+       // Needs straightening out.
+       size_t front_count;
+       size_t back_count;
+       BUN back_offset;
+       if (state->new_offsets != NULL) {
+               if (state->new_offsets->tsorted) {
+                       assert(BATcount(state->new_offsets) >= 1);
+                       BUN start = BATcount(state->new_offsets) - 1;
+                       oid at_start = *(oid*)Tloc(state->new_offsets, start);
+                       while (start > 0) {
+                               oid below_start = 
*(oid*)Tloc(state->new_offsets, start - 1);
+                               if (at_start != below_start + 1)
+                                       break;
+                               start = start - 1;
+                               at_start = below_start;
+                       }
+                       front_count = start;
+                       back_count = nrows - start;
+                       back_offset = at_start;
+                       BATsetcount(state->new_offsets, start);
+               } else {
+                       front_count = nrows;
+                       back_count = 0;
+                       back_offset = dummy_offset;
+               }
+       } else {
+               front_count = 0;
+               back_count = nrows;
+               back_offset = state->offset;
+       }
+       state->offset = back_offset;
+
+       // debugging
+       (void)front_count;
+       (void)back_count;
+       // if (front_count > 0) {
+       //      for (size_t j = 0; j < front_count; j++) {
+       //              BUN pos = (BUN)*(oid*)Tloc(state->new_offsets, j);
+       //              fprintf(stderr, "scattered offset[%zu] = " BUNFMT "\n", 
j, pos);
+       //      }
+       // }
+       // if (back_count > 0) {
+       //      BUN start = back_offset;
+       //      BUN end = start + (BUN)back_count - 1;
+       //      fprintf(stderr, "consecutive offsets: " BUNFMT " .. " 
BUNFMT"\n", start, end);
+       // }
+
+
+       assert(msg == MAL_SUCCEED);
+       return msg;
+
+bailout:
+       assert(msg != MAL_SUCCEED);
+       return msg;
+}
+
+static BAT*
+directappend_get_offsets_bat(struct directappend *state)
+{
+       return state->all_offsets;
+}
+
+static str
+directappend_append_one(struct directappend *state, size_t idx, const void 
*const_data, void *col)
+{
+       BAT *scattered_offsets = state->new_offsets;
+       BUN scattered_count = scattered_offsets ? BATcount(scattered_offsets) : 
0;
+       BUN off;
+       if (idx < scattered_count) {
+               off = *(oid*)Tloc(scattered_offsets, idx);
+               // fprintf(stderr, "Took offset " BUNFMT " from position %zu of 
the offsets BAT\n", off, idx);
+       } else {
+               off = state->offset + (idx - scattered_count);
+               // fprintf(stderr, "Took offset " BUNFMT " as %zu plus base " 
BUNFMT "\n", off, idx, state->offset);
+       }
+
+       sql_column *c = col;
+       int tpe = c->type.type->localtype;
+       sqlstore *store = state->mvc->session->tr->store;
+
+       // unfortunately, append_col_fptr doesn't take const void*.
+       void *data = ATOMextern(tpe) ? &const_data : (void*)const_data;
+       int     ret = store->storage_api.append_col(state->mvc->session->tr, c, 
off, NULL, data, 1, tpe);
+       if (ret != LOG_OK) {
+               throw(SQL, "sql.append", SQLSTATE(42000) "Append failed%s", ret 
== LOG_CONFLICT ? " due to conflict with another transaction" : "");
+       }
+
+       return MAL_SUCCEED;
+}
+
+static str
+directappend_append_batch(struct directappend *state, const void *const_data, 
BUN count, int width, void *col)
+{
+       sqlstore *store = state->mvc->session->tr->store;
+       sql_column *c = col;
+       int tpe = c->type.type->localtype;
+
+       (void)width;
+       assert(width== ATOMsize(tpe));
+
+       BUN scattered_count = state->new_offsets ? BATcount(state->new_offsets) 
: 0;
+
+       int ret = LOG_OK;
+
+       if (scattered_count > 0) {
+               BUN dummy_offset = GDK_oid_max;
+               ret = store->storage_api.append_col(
+                                       state->mvc->session->tr, c,
+                                       dummy_offset, state->new_offsets,
+                                       (void*)const_data, scattered_count, tpe
+               );
+       }
+
+       if (ret == LOG_OK && count > scattered_count) {
+               char *remaining_data = (char*)const_data + scattered_count * 
width;
+               BUN remaining_count = count - scattered_count;
+               ret = store->storage_api.append_col(
+                                       state->mvc->session->tr, c,
+                                       state->offset, NULL,
+                                       remaining_data, remaining_count, tpe
+               );
+       }
+       if (ret != LOG_OK) {
+               throw(SQL, "sql.append", SQLSTATE(42000) "Append failed%s", ret 
== LOG_CONFLICT ? " due to conflict with another transaction" : "");
+       }
+
+       return MAL_SUCCEED;
+}
 
 
 #define MAXWORKERS     64
@@ -408,7 +645,7 @@ typedef struct {
        int besteffort;
        bte *rowerror;
        int errorcnt;
-       LoadOps *loadops;
+       struct directappend *directappend;
        struct scratch_buffer scratch;
        struct scratch_buffer primary;
        struct scratch_buffer secondary;
@@ -689,7 +926,7 @@ report_append_failed(READERtask *task, C
        char *err = NULL;
        /* failure */
        if (task->rowerror) {
-               lng row = BATcount(task->loadops ? 
task->loadops->get_offsets(task->loadops->state) : fmt->c);
+               lng row = BATcount(task->directappend ? 
directappend_get_offsets_bat(task->directappend) : fmt->c);
                MT_lock_set(&errorlock);
                if (task->cntxt->error_row == NULL ||
                        BUNappend(task->cntxt->error_row, &row, false) != 
GDK_SUCCEED ||
@@ -717,7 +954,7 @@ SQLworker_onebyone_column(READERtask *ta
                if (SQLconvert_val(task, col, i, &fmt->data, &fmt->len) < 0)
                        return -1;
                const void *value = fmt->data ? fmt->data : fmt->nildata;
-               str msg = task->loadops->append_one(task->loadops->state, i, 
value, fmt->appendcol);
+               str msg = directappend_append_one(task->directappend, i, value, 
fmt->appendcol);
                if (msg != MAL_SUCCEED) {
                        report_append_failed(task, fmt, i, col + 1);
                        return -1;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to