Changeset: 6da178705fa7 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/6da178705fa7 Modified Files: gdk/gdk_logger.c gdk/gdk_logger_internals.h Branch: logger-fix Log Message:
clean up synchronization gdk_logger diffs (truncated from 1261 to 300 lines): diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -235,8 +235,8 @@ log_write_format(logger *lg, logformat * { assert(data->id || data->flag); assert(!lg->inmemory); - if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 && - mnstr_writeInt(lg->output_log, data->id)) + if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 && + mnstr_writeInt(lg->current->output_log, data->id)) return GDK_SUCCEED; TRC_CRITICAL(GDK, "write failed\n"); return GDK_FAIL; @@ -281,7 +281,7 @@ log_write_id(logger *lg, int id) { assert(!lg->inmemory); assert(id >= 0); - if (mnstr_writeInt(lg->output_log, id)) + if (mnstr_writeInt(lg->current->output_log, id)) return GDK_SUCCEED; TRC_CRITICAL(GDK, "write failed\n"); return GDK_FAIL; @@ -300,7 +300,7 @@ log_read_id(logger *lg, log_id *id) #endif static log_return -string_reader(logger *lg, BAT *b, lng nr) +string_reader(logger *lg, char* rbuf, size_t* rbufsize, BAT *b, lng nr) { size_t sz = 0; lng SZ = 0; @@ -310,24 +310,22 @@ string_reader(logger *lg, BAT *b, lng nr if (mnstr_readLng(lg->input_log, &SZ) != 1) return LOG_EOF; sz = (size_t)SZ; - char *buf = lg->buf; - if (lg->bufsize < sz) { - if (!(buf = GDKrealloc(lg->buf, sz))) + if (*rbufsize < sz) { + if (!(rbuf = GDKrealloc(rbuf, sz))) return LOG_ERR; - lg->buf = buf; - lg->bufsize = sz; + *rbufsize = sz; } - if (mnstr_read(lg->input_log, buf, sz, 1) != 1) + if (mnstr_read(lg->input_log, rbuf, sz, 1) != 1) return LOG_EOF; /* handle strings */ - char *t = buf; + char *t = rbuf; /* chunked */ #define CHUNK_SIZE 1024 char *strings[CHUNK_SIZE]; int cur = 0; - for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) { + for(; nr>0 && res == LOG_OK && t < (rbuf+sz); nr--) { strings[cur++] = t; if (cur == CHUNK_SIZE && b && BUNappendmulti(b, strings, cur, true) != GDK_SUCCEED) res = LOG_ERR; @@ -370,6 +368,8 @@ log_read_updates(logger *lg, trans *tr, pnr = nr; tpe = find_type_nr(lg, type_id); if (tpe >= 0) { + size_t rbufsize = 64*1024; + void * rbuf = GDKmalloc(rbufsize); BAT *uid = NULL; BAT *r = NULL; void *(*rt) (ptr, size_t *, stream *, size_t) = BATatoms[tpe].atomRead; @@ -379,13 +379,17 @@ log_read_updates(logger *lg, trans *tr, if (!lg->flushing && l->flag == LOG_UPDATE) { uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT); if (uid == NULL) { + GDKfree(rbuf); return LOG_ERR; } } if (l->flag == LOG_UPDATE_CONST) { - if (mnstr_readLng(lg->input_log, &offset) != 1) + if (mnstr_readLng(lg->input_log, &offset) != 1) { + GDKfree(rbuf); return LOG_ERR; + + } if (cands) { // This const range actually represents a segment of candidates corresponding to updated bat entries @@ -420,11 +424,12 @@ log_read_updates(logger *lg, trans *tr, } // We have to read the value to update the read cursor - size_t tlen = lg->bufsize; - void *t = rt(lg->buf, &tlen, lg->input_log, 1); + size_t tlen = rbufsize; + void *t = rt(rbuf, &tlen, lg->input_log, 1); if (t == NULL) { res = LOG_ERR; } + GDKfree(rbuf); return res; } } @@ -434,18 +439,19 @@ log_read_updates(logger *lg, trans *tr, if (r == NULL) { if (uid) BBPreclaim(uid); + GDKfree(rbuf); return LOG_ERR; } } if (l->flag == LOG_UPDATE_CONST) { - size_t tlen = lg->bufsize; - void *t = rt(lg->buf, &tlen, lg->input_log, 1); + size_t tlen = rbufsize; + void *t = rt(rbuf, &tlen, lg->input_log, 1); if (t == NULL) { res = LOG_ERR; } else { - lg->buf = t; - lg->bufsize = tlen; + rbuf = t; + rbufsize = tlen; for(BUN p = 0; p<(BUN) nr; p++) { if (r && BUNappend(r, t, true) != GDK_SUCCEED) res = LOG_ERR; @@ -455,6 +461,7 @@ log_read_updates(logger *lg, trans *tr, if (mnstr_readLng(lg->input_log, &offset) != 1) { if (r) BBPreclaim(r); + GDKfree(rbuf); return LOG_ERR; } if (tpe == TYPE_msk) { @@ -464,42 +471,42 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - size_t tlen = lg->bufsize/sizeof(int); + size_t tlen = rbufsize/sizeof(int); size_t cnt = 0, snr = (size_t)nr; snr = (snr+31)/32; assert(tlen); for (; res == LOG_OK && snr > 0; snr-=cnt) { cnt = snr>tlen?tlen:snr; - if (!mnstr_readIntArray(lg->input_log, lg->buf, cnt)) + if (!mnstr_readIntArray(lg->input_log, rbuf, cnt)) res = LOG_ERR; } } } else { if (!ATOMvarsized(tpe)) { size_t cnt = 0, snr = (size_t)nr; - size_t tlen = lg->bufsize/ATOMsize(tpe), ntlen = lg->bufsize; + size_t tlen = rbufsize/ATOMsize(tpe), ntlen = rbufsize; assert(tlen); /* read in chunks of max * BUFSIZE/width rows */ for (; res == LOG_OK && snr > 0; snr-=cnt) { cnt = snr>tlen?tlen:snr; - void *t = rt(lg->buf, &ntlen, lg->input_log, cnt); + void *t = rt(rbuf, &ntlen, lg->input_log, cnt); if (t == NULL) { res = LOG_EOF; break; } - assert(t == lg->buf); + assert(t == rbuf); if (r && BUNappendmulti(r, t, cnt, true) != GDK_SUCCEED) res = LOG_ERR; } } else if (tpe == TYPE_str) { /* efficient string */ - res = string_reader(lg, r, nr); + res = string_reader(lg, rbuf, &rbufsize, r, nr); } else { for (; res == LOG_OK && nr > 0; nr--) { - size_t tlen = lg->bufsize; - void *t = rt(lg->buf, &tlen, lg->input_log, 1); + size_t tlen = rbufsize; + void *t = rt(rbuf, &tlen, lg->input_log, 1); if (t == NULL) { /* see if failure was due to @@ -511,8 +518,8 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - lg->buf = t; - lg->bufsize = tlen; + rbuf = t; + rbufsize = tlen; if (r && BUNappend(r, t, true) != GDK_SUCCEED) res = LOG_ERR; } @@ -559,11 +566,11 @@ log_read_updates(logger *lg, trans *tr, } } else if (tpe == TYPE_str) { /* efficient string */ - res = string_reader(lg, r, nr); + res = string_reader(lg, rbuf, &rbufsize, r, nr); } else { for (; res == LOG_OK && nr > 0; nr--) { - size_t tlen = lg->bufsize; - void *t = rt(lg->buf, &tlen, lg->input_log, 1); + size_t tlen = rbufsize; + void *t = rt(rbuf, &tlen, lg->input_log, 1); if (t == NULL) { if (strstr(GDKerrbuf, "malloc") == NULL) @@ -571,8 +578,8 @@ log_read_updates(logger *lg, trans *tr, else res = LOG_ERR; } else { - lg->buf = t; - lg->bufsize = tlen; + rbuf = t; + rbufsize = tlen; if ((r && BUNappend(r, t, true) != GDK_SUCCEED)) res = LOG_ERR; } @@ -622,6 +629,7 @@ log_read_updates(logger *lg, trans *tr, else if (uid) BBPreclaim(uid); } + GDKfree(rbuf); } else { /* bat missing ERROR or ignore ? currently error. */ res = LOG_ERR; @@ -1039,18 +1047,37 @@ log_create_types_file(logger *lg, const return GDK_SUCCEED; } +static inline void +rotation_lock(logger *lg) { + MT_lock_set(&lg->rotation_lock); +} + +static inline void +rotation_unlock(logger *lg) { + MT_lock_unset(&lg->rotation_lock); +} + +static inline bool +try_rotation_lock(logger *lg) { + return MT_lock_try(&lg->rotation_lock); +} + static gdk_return log_open_output(logger *lg) { logged_range *new_range = (logged_range*)GDKmalloc(sizeof(logged_range)); + ATOMIC_INIT(&new_range->refcount, 1); + if (!new_range) { TRC_CRITICAL(GDK, "allocation failure\n"); return GDK_FAIL; } - lg->end = 0; - lg->drops = 0; + ATOMIC_INIT(&new_range->end, 0); + ATOMIC_INIT(&new_range->pend, 0); + ATOMIC_INIT(&new_range->flushed_end, 0); + ATOMIC_INIT(&new_range->drops, 0); if (!LOG_DISABLED(lg)) { char id[32]; char *filename; @@ -1068,14 +1095,13 @@ log_open_output(logger *lg) if (lg->debug & 1) fprintf(stderr, "#log_open_output: %s.%s\n", LOGFILE, id); - lg->output_log = open_wstream(filename); - if (lg->output_log) { + new_range->output_log = open_wstream(filename); + if (new_range->output_log) { short byteorder = 1234; - mnstr_write(lg->output_log, &byteorder, sizeof(byteorder), 1); + mnstr_write(new_range->output_log, &byteorder, sizeof(byteorder), 1); } - lg->end = 0; - - if (lg->output_log == NULL || mnstr_errnr(lg->output_log) != MNSTR_NO__ERROR) { + + if (new_range->output_log == NULL || mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) { TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, mnstr_peek_error(NULL)); GDKfree(new_range); GDKfree(filename); @@ -1084,15 +1110,11 @@ log_open_output(logger *lg) GDKfree(filename); } new_range->id = lg->id; - new_range->first_tid = lg->tid; - new_range->last_tid = lg->tid; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org