Changeset: 40690d9a6feb for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/40690d9a6feb
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger_internals.h
Branch: logger-cleanup
Log Message:
clean up synchronization gdk_logger
diffs (truncated from 1280 to 300 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -97,8 +97,17 @@ typedef enum {LOG_OK, LOG_EOF, LOG_ERR}
static gdk_return bm_commit(logger *lg);
static gdk_return tr_grow(trans *tr);
-#define log_lock(lg) MT_lock_set(&(lg)->lock)
-#define log_unlock(lg) MT_lock_unset(&(lg)->lock)
+static inline void
+log_lock(logger *lg)
+{
+ MT_lock_set(&lg->lock);
+}
+
+static inline void
+log_unlock(logger *lg)
+{
+ MT_lock_unset(&lg->lock);
+}
static inline bte
find_type(logger *lg, int tpe)
@@ -225,8 +234,8 @@ log_write_format(logger *lg, logformat *
{
assert(data->id || data->flag);
assert(!lg->inmemory);
- if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
- mnstr_writeInt(lg->output_log, data->id))
+ if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
+ mnstr_writeInt(lg->current->output_log, data->id))
return GDK_SUCCEED;
TRC_CRITICAL(GDK, "write failed\n");
return GDK_FAIL;
@@ -271,7 +280,7 @@ log_write_id(logger *lg, int id)
{
assert(!lg->inmemory);
assert(id >= 0);
- if (mnstr_writeInt(lg->output_log, id))
+ if (mnstr_writeInt(lg->current->output_log, id))
return GDK_SUCCEED;
TRC_CRITICAL(GDK, "write failed\n");
return GDK_FAIL;
@@ -290,7 +299,7 @@ log_read_id(logger *lg, log_id *id)
#endif
static log_return
-string_reader(logger *lg, BAT *b, lng nr)
+string_reader(logger *lg, char* rbuf, size_t* rbufsize, BAT *b, lng nr)
{
size_t sz = 0;
lng SZ = 0;
@@ -300,24 +309,22 @@ string_reader(logger *lg, BAT *b, lng nr
if (mnstr_readLng(lg->input_log, &SZ) != 1)
return LOG_EOF;
sz = (size_t)SZ;
- char *buf = lg->buf;
- if (lg->bufsize < sz) {
- if (!(buf = GDKrealloc(lg->buf, sz)))
+ if (*rbufsize < sz) {
+ if (!(rbuf = GDKrealloc(rbuf, sz)))
return LOG_ERR;
- lg->buf = buf;
- lg->bufsize = sz;
+ *rbufsize = sz;
}
- if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
+ if (mnstr_read(lg->input_log, rbuf, sz, 1) != 1)
return LOG_EOF;
/* handle strings */
- char *t = buf;
+ char *t = rbuf;
/* chunked */
#define CHUNK_SIZE 1024
char *strings[CHUNK_SIZE];
int cur = 0;
- for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
+ for(; nr>0 && res == LOG_OK && t < (rbuf+sz); nr--) {
strings[cur++] = t;
if (cur == CHUNK_SIZE && b && BUNappendmulti(b,
strings, cur, true) != GDK_SUCCEED)
res = LOG_ERR;
@@ -360,6 +367,8 @@ log_read_updates(logger *lg, trans *tr,
pnr = nr;
tpe = find_type_nr(lg, type_id);
if (tpe >= 0) {
+ size_t rbufsize = 64*1024;
+ void * rbuf = GDKmalloc(rbufsize);
BAT *uid = NULL;
BAT *r = NULL;
void *(*rt) (ptr, size_t *, stream *, size_t) =
BATatoms[tpe].atomRead;
@@ -369,13 +378,17 @@ log_read_updates(logger *lg, trans *tr,
if (!lg->flushing && l->flag == LOG_UPDATE) {
uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
if (uid == NULL) {
+ GDKfree(rbuf);
return LOG_ERR;
}
}
if (l->flag == LOG_UPDATE_CONST) {
- if (mnstr_readLng(lg->input_log, &offset) != 1)
+ if (mnstr_readLng(lg->input_log, &offset) != 1) {
+ GDKfree(rbuf);
return LOG_ERR;
+
+ }
if (cands) {
// This const range actually represents a
segment of candidates corresponding to updated bat entries
@@ -410,11 +423,12 @@ log_read_updates(logger *lg, trans *tr,
}
// We have to read the value to update the read
cursor
- size_t tlen = lg->bufsize;
- void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+ size_t tlen = rbufsize;
+ void *t = rt(rbuf, &tlen, lg->input_log, 1);
if (t == NULL) {
res = LOG_ERR;
}
+ GDKfree(rbuf);
return res;
}
}
@@ -424,18 +438,19 @@ log_read_updates(logger *lg, trans *tr,
if (r == NULL) {
if (uid)
BBPreclaim(uid);
+ GDKfree(rbuf);
return LOG_ERR;
}
}
if (l->flag == LOG_UPDATE_CONST) {
- size_t tlen = lg->bufsize;
- void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+ size_t tlen = rbufsize;
+ void *t = rt(rbuf, &tlen, lg->input_log, 1);
if (t == NULL) {
res = LOG_ERR;
} else {
- lg->buf = t;
- lg->bufsize = tlen;
+ rbuf = t;
+ rbufsize = tlen;
for(BUN p = 0; p<(BUN) nr; p++) {
if (r && BUNappend(r, t, true) !=
GDK_SUCCEED)
res = LOG_ERR;
@@ -445,6 +460,7 @@ log_read_updates(logger *lg, trans *tr,
if (mnstr_readLng(lg->input_log, &offset) != 1) {
if (r)
BBPreclaim(r);
+ GDKfree(rbuf);
return LOG_ERR;
}
if (tpe == TYPE_msk) {
@@ -454,42 +470,42 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- size_t tlen = lg->bufsize/sizeof(int);
+ size_t tlen = rbufsize/sizeof(int);
size_t cnt = 0, snr = (size_t)nr;
snr = (snr+31)/32;
assert(tlen);
for (; res == LOG_OK && snr > 0;
snr-=cnt) {
cnt = snr>tlen?tlen:snr;
- if
(!mnstr_readIntArray(lg->input_log, lg->buf, cnt))
+ if
(!mnstr_readIntArray(lg->input_log, rbuf, cnt))
res = LOG_ERR;
}
}
} else {
if (!ATOMvarsized(tpe)) {
size_t cnt = 0, snr = (size_t)nr;
- size_t tlen =
lg->bufsize/ATOMsize(tpe), ntlen = lg->bufsize;
+ size_t tlen = rbufsize/ATOMsize(tpe),
ntlen = rbufsize;
assert(tlen);
/* read in chunks of max
* BUFSIZE/width rows */
for (; res == LOG_OK && snr > 0;
snr-=cnt) {
cnt = snr>tlen?tlen:snr;
- void *t = rt(lg->buf, &ntlen,
lg->input_log, cnt);
+ void *t = rt(rbuf, &ntlen,
lg->input_log, cnt);
if (t == NULL) {
res = LOG_EOF;
break;
}
- assert(t == lg->buf);
+ assert(t == rbuf);
if (r && BUNappendmulti(r, t,
cnt, true) != GDK_SUCCEED)
res = LOG_ERR;
}
} else if (tpe == TYPE_str) {
/* efficient string */
- res = string_reader(lg, r, nr);
+ res = string_reader(lg, rbuf,
&rbufsize, r, nr);
} else {
for (; res == LOG_OK && nr > 0; nr--) {
- size_t tlen = lg->bufsize;
- void *t = rt(lg->buf, &tlen,
lg->input_log, 1);
+ size_t tlen = rbufsize;
+ void *t = rt(rbuf, &tlen,
lg->input_log, 1);
if (t == NULL) {
/* see if failure was
due to
@@ -501,8 +517,8 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- lg->buf = t;
- lg->bufsize = tlen;
+ rbuf = t;
+ rbufsize = tlen;
if (r && BUNappend(r,
t, true) != GDK_SUCCEED)
res = LOG_ERR;
}
@@ -549,11 +565,11 @@ log_read_updates(logger *lg, trans *tr,
}
} else if (tpe == TYPE_str) {
/* efficient string */
- res = string_reader(lg, r, nr);
+ res = string_reader(lg, rbuf, &rbufsize, r, nr);
} else {
for (; res == LOG_OK && nr > 0; nr--) {
- size_t tlen = lg->bufsize;
- void *t = rt(lg->buf, &tlen,
lg->input_log, 1);
+ size_t tlen = rbufsize;
+ void *t = rt(rbuf, &tlen,
lg->input_log, 1);
if (t == NULL) {
if (strstr(GDKerrbuf, "malloc")
== NULL)
@@ -561,8 +577,8 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- lg->buf = t;
- lg->bufsize = tlen;
+ rbuf = t;
+ rbufsize = tlen;
if ((r && BUNappend(r, t, true)
!= GDK_SUCCEED))
res = LOG_ERR;
}
@@ -612,6 +628,7 @@ log_read_updates(logger *lg, trans *tr,
else if (uid)
BBPreclaim(uid);
}
+ GDKfree(rbuf);
} else {
/* bat missing ERROR or ignore ? currently error. */
res = LOG_ERR;
@@ -1028,18 +1045,37 @@ log_create_types_file(logger *lg, const
return GDK_SUCCEED;
}
+static inline void
+rotation_lock(logger *lg) {
+ MT_lock_set(&lg->rotation_lock);
+}
+
+static inline void
+rotation_unlock(logger *lg) {
+ MT_lock_unset(&lg->rotation_lock);
+}
+
+static inline bool
+try_rotation_lock(logger *lg) {
+ return MT_lock_try(&lg->rotation_lock);
+}
+
static gdk_return
log_open_output(logger *lg)
{
logged_range *new_range =
(logged_range*)GDKmalloc(sizeof(logged_range));
+ ATOMIC_INIT(&new_range->refcount, 1);
+
if (!new_range) {
TRC_CRITICAL(GDK, "allocation failure\n");
return GDK_FAIL;
}
- lg->end = 0;
- lg->drops = 0;
+ ATOMIC_INIT(&new_range->end, 0);
+ ATOMIC_INIT(&new_range->pend, 0);
+ ATOMIC_INIT(&new_range->flushed_end, 0);
+ ATOMIC_INIT(&new_range->drops, 0);
if (!LOG_DISABLED(lg)) {
char id[32];
char *filename;
@@ -1057,14 +1093,13 @@ log_open_output(logger *lg)
if (lg->debug & 1)
fprintf(stderr, "#log_open_output: %s.%s\n", LOGFILE,
id);
- lg->output_log = open_wstream(filename);
- if (lg->output_log) {
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]