Changeset: aa611626e4d0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/aa611626e4d0
Modified Files:
sql/backends/monet5/sql_copyinto.c
Branch: directappend
Log Message:
Get rid of the LoadOps callback struct
diffs (truncated from 737 to 300 lines):
diff --git a/sql/backends/monet5/sql_copyinto.c
b/sql/backends/monet5/sql_copyinto.c
--- a/sql/backends/monet5/sql_copyinto.c
+++ b/sql/backends/monet5/sql_copyinto.c
@@ -94,19 +94,256 @@ typedef struct Table_t {
BAT *complaints; /* lines that did not match the
required input */
} Tablet;
-// Callback interface to append the data directly instead of storing it in
intermediate BATs.
-// SQLload_file doesn't know how to manipulate the sql transaction
bookkeeping, caller does.
-typedef str (*loadfile_claim_fptr)(void *state, size_t nrows, size_t ncols,
Column *cols[]);
-typedef str (*loadfile_append_one_fptr)(void *state, size_t idx, const void
*data, void *col);
-typedef str (*loadfile_append_batch_fptr)(void *state, const void *data, BUN
count, int width, void *col);
-typedef BAT *(*loadfile_get_offsets_bat_fptr)(void *state);
-typedef struct LoadOps {
- void *state;
- loadfile_claim_fptr claim;
- loadfile_append_one_fptr append_one;
- loadfile_append_batch_fptr append_batch;
- loadfile_get_offsets_bat_fptr get_offsets;
-} LoadOps;
+
+struct directappend {
+ mvc *mvc;
+ sql_table *t;
+ BAT *all_offsets; // all offsets ever generated
+ BAT *new_offsets; // as most recently returned by mvc_claim_slots.
+ BUN offset; // as most recently returned by mvc_claim_slots.
+};
+
+static void
+directappend_destroy(struct directappend *state)
+{
+ if (state == NULL)
+ return;
+ struct directappend *st = state;
+ if (st->all_offsets)
+ BBPreclaim(st->all_offsets);
+ if (st->new_offsets)
+ BBPreclaim(st->new_offsets);
+}
+
+static str
+directappend_init(struct directappend *state, Client cntxt, sql_table *t)
+{
+ str msg = MAL_SUCCEED;
+ *state = (struct directappend) { 0 };
+ backend *be;
+ mvc *mvc;
+
+ msg = checkSQLContext(cntxt);
+ if (msg != MAL_SUCCEED)
+ goto bailout;
+ be = cntxt->sqlcontext;
+ mvc = be->mvc;
+ state->mvc = mvc;
+ state->t = t;
+
+ state->all_offsets = COLnew(0, TYPE_oid, 0, TRANSIENT);
+ if (state->all_offsets == NULL) {
+ msg = createException(SQL, "sql.append_from", SQLSTATE(HY013)
MAL_MALLOC_FAIL);
+ goto bailout;
+ }
+
+ assert(msg == MAL_SUCCEED);
+ return msg;
+
+bailout:
+ assert(msg != MAL_SUCCEED);
+ directappend_destroy(state);
+ return msg;
+}
+
+static str
+directappend_claim(void *state_, size_t nrows, size_t ncols, Column *cols[])
+{
+ str msg = MAL_SUCCEED;
+
+ // these parameters aren't used right now, useful if we ever also move
the
+ // old bunfastapp-on-temporary-bats scheme to the callback interface
+ // too, making SQLload_file fully mechanism agnostic.
+ // Then again, maybe just drop them instead.
+ (void)ncols;
+ (void)cols;
+
+ assert(state_ != NULL);
+ struct directappend *state = state_;
+
+ if (state->new_offsets != NULL) {
+ // Leftover from previous round, the logic below counts on it
not being present.
+ // We can change that but have to do so carefully. for now just
drop it.
+ BBPreclaim(state->new_offsets);
+ state->new_offsets = NULL;
+ }
+
+ // Allocate room for this batch
+ BUN dummy_offset = 424242424242;
+ state->offset = dummy_offset;
+ sql_trans *tr = state->mvc->session->tr;
+ sqlstore *store = tr->store;
+ int ret = store->storage_api.claim_tab(tr, state->t, nrows,
&state->offset, &state->new_offsets);
+ // int ret = mvc_claim_slots(state->mvc->session->tr, state->t, nrows,
&state->offset, &state->new_offsets);
+ if (ret != LOG_OK) {
+ msg = createException(SQL, "sql.append_from", SQLSTATE(3F000)
"Could not claim slots");
+ goto bailout;
+ }
+
+ // Append the batch to all_offsets.
+ if (state->new_offsets != NULL) {
+ if (BATappend(state->all_offsets, state->new_offsets, NULL,
false) != GDK_SUCCEED) {
+ msg = createException(SQL, "sql.append_from",
SQLSTATE(3F000) "BATappend failed");
+ goto bailout;
+ }
+ } else {
+ // Help, there must be a BATfunction for this.
+ // Also, maybe we should try to make state->all_offsets a void
BAT and only
+ // switch to materialized oid's if necessary.
+ BUN oldcount = BATcount(state->all_offsets);
+ BUN newcount = oldcount + nrows;
+ if (BATcapacity(state->all_offsets) < newcount) {
+ if (BATextend(state->all_offsets, newcount) !=
GDK_SUCCEED) {
+ msg = createException(SQL, "sql.append_from",
SQLSTATE(HY013) MAL_MALLOC_FAIL);
+ goto bailout;
+ }
+ }
+ oid * oo = Tloc(state->all_offsets, oldcount);
+ for (BUN i = 0; i < nrows; i++)
+ *oo++ = state->offset + i;
+ BATsetcount(state->all_offsets, newcount);
+ }
+
+ // The protocol for mvc_claim is that it returns either a consecutive
block,
+ // by setting *offset, or a BAT of positions by setting *offsets.
However,
+ // it is possible and even likely that only the first few items of the
BAT
+ // are actually scattered positions, while the rest is still a
consecutive
+ // block at the end. Appending at the end is much cheaper so we peel the
+ // consecutive elements off the back of the BAT and treat them
separately.
+ //
+ // In the remainder of the function, 'state->newoffsets' holds
'front_count'
+ // positions if it exists, while another 'back_count' positions start at
+ // 'back_offset'.
+ //
+ // TODO this code has become a little convoluted as it evolved.
+ // Needs straightening out.
+ size_t front_count;
+ size_t back_count;
+ BUN back_offset;
+ if (state->new_offsets != NULL) {
+ if (state->new_offsets->tsorted) {
+ assert(BATcount(state->new_offsets) >= 1);
+ BUN start = BATcount(state->new_offsets) - 1;
+ oid at_start = *(oid*)Tloc(state->new_offsets, start);
+ while (start > 0) {
+ oid below_start =
*(oid*)Tloc(state->new_offsets, start - 1);
+ if (at_start != below_start + 1)
+ break;
+ start = start - 1;
+ at_start = below_start;
+ }
+ front_count = start;
+ back_count = nrows - start;
+ back_offset = at_start;
+ BATsetcount(state->new_offsets, start);
+ } else {
+ front_count = nrows;
+ back_count = 0;
+ back_offset = dummy_offset;
+ }
+ } else {
+ front_count = 0;
+ back_count = nrows;
+ back_offset = state->offset;
+ }
+ state->offset = back_offset;
+
+ // debugging
+ (void)front_count;
+ (void)back_count;
+ // if (front_count > 0) {
+ // for (size_t j = 0; j < front_count; j++) {
+ // BUN pos = (BUN)*(oid*)Tloc(state->new_offsets, j);
+ // fprintf(stderr, "scattered offset[%zu] = " BUNFMT "\n",
j, pos);
+ // }
+ // }
+ // if (back_count > 0) {
+ // BUN start = back_offset;
+ // BUN end = start + (BUN)back_count - 1;
+ // fprintf(stderr, "consecutive offsets: " BUNFMT " .. "
BUNFMT"\n", start, end);
+ // }
+
+
+ assert(msg == MAL_SUCCEED);
+ return msg;
+
+bailout:
+ assert(msg != MAL_SUCCEED);
+ return msg;
+}
+
+static BAT*
+directappend_get_offsets_bat(struct directappend *state)
+{
+ return state->all_offsets;
+}
+
+static str
+directappend_append_one(struct directappend *state, size_t idx, const void
*const_data, void *col)
+{
+ BAT *scattered_offsets = state->new_offsets;
+ BUN scattered_count = scattered_offsets ? BATcount(scattered_offsets) :
0;
+ BUN off;
+ if (idx < scattered_count) {
+ off = *(oid*)Tloc(scattered_offsets, idx);
+ // fprintf(stderr, "Took offset " BUNFMT " from position %zu of
the offsets BAT\n", off, idx);
+ } else {
+ off = state->offset + (idx - scattered_count);
+ // fprintf(stderr, "Took offset " BUNFMT " as %zu plus base "
BUNFMT "\n", off, idx, state->offset);
+ }
+
+ sql_column *c = col;
+ int tpe = c->type.type->localtype;
+ sqlstore *store = state->mvc->session->tr->store;
+
+ // unfortunately, append_col_fptr doesn't take const void*.
+ void *data = ATOMextern(tpe) ? &const_data : (void*)const_data;
+ int ret = store->storage_api.append_col(state->mvc->session->tr, c,
off, NULL, data, 1, tpe);
+ if (ret != LOG_OK) {
+ throw(SQL, "sql.append", SQLSTATE(42000) "Append failed%s", ret
== LOG_CONFLICT ? " due to conflict with another transaction" : "");
+ }
+
+ return MAL_SUCCEED;
+}
+
+static str
+directappend_append_batch(struct directappend *state, const void *const_data,
BUN count, int width, void *col)
+{
+ sqlstore *store = state->mvc->session->tr->store;
+ sql_column *c = col;
+ int tpe = c->type.type->localtype;
+
+ (void)width;
+ assert(width== ATOMsize(tpe));
+
+ BUN scattered_count = state->new_offsets ? BATcount(state->new_offsets)
: 0;
+
+ int ret = LOG_OK;
+
+ if (scattered_count > 0) {
+ BUN dummy_offset = GDK_oid_max;
+ ret = store->storage_api.append_col(
+ state->mvc->session->tr, c,
+ dummy_offset, state->new_offsets,
+ (void*)const_data, scattered_count, tpe
+ );
+ }
+
+ if (ret == LOG_OK && count > scattered_count) {
+ char *remaining_data = (char*)const_data + scattered_count *
width;
+ BUN remaining_count = count - scattered_count;
+ ret = store->storage_api.append_col(
+ state->mvc->session->tr, c,
+ state->offset, NULL,
+ remaining_data, remaining_count, tpe
+ );
+ }
+ if (ret != LOG_OK) {
+ throw(SQL, "sql.append", SQLSTATE(42000) "Append failed%s", ret
== LOG_CONFLICT ? " due to conflict with another transaction" : "");
+ }
+
+ return MAL_SUCCEED;
+}
#define MAXWORKERS 64
@@ -408,7 +645,7 @@ typedef struct {
int besteffort;
bte *rowerror;
int errorcnt;
- LoadOps *loadops;
+ struct directappend *directappend;
struct scratch_buffer scratch;
struct scratch_buffer primary;
struct scratch_buffer secondary;
@@ -689,7 +926,7 @@ report_append_failed(READERtask *task, C
char *err = NULL;
/* failure */
if (task->rowerror) {
- lng row = BATcount(task->loadops ?
task->loadops->get_offsets(task->loadops->state) : fmt->c);
+ lng row = BATcount(task->directappend ?
directappend_get_offsets_bat(task->directappend) : fmt->c);
MT_lock_set(&errorlock);
if (task->cntxt->error_row == NULL ||
BUNappend(task->cntxt->error_row, &row, false) !=
GDK_SUCCEED ||
@@ -717,7 +954,7 @@ SQLworker_onebyone_column(READERtask *ta
if (SQLconvert_val(task, col, i, &fmt->data, &fmt->len) < 0)
return -1;
const void *value = fmt->data ? fmt->data : fmt->nildata;
- str msg = task->loadops->append_one(task->loadops->state, i,
value, fmt->appendcol);
+ str msg = directappend_append_one(task->directappend, i, value,
fmt->appendcol);
if (msg != MAL_SUCCEED) {
report_append_failed(task, fmt, i, col + 1);
return -1;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list