Changeset: 8f5ebc50168a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/8f5ebc50168a
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger_internals.h
gdk/gdk_logger_old.c
Branch: logger-fix
Log Message:
merged with sep2022
diffs (truncated from 433 to 300 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -300,7 +300,7 @@ log_read_id(logger *lg, log_id *id)
#endif
static log_return
-string_reader(logger *lg, char** rbuf, size_t* rbufsize, BAT *b, lng nr)
+string_reader(logger *lg, BAT *b, lng nr)
{
size_t sz = 0;
lng SZ = 0;
@@ -310,22 +310,24 @@ string_reader(logger *lg, char** rbuf, s
if (mnstr_readLng(lg->input_log, &SZ) != 1)
return LOG_EOF;
sz = (size_t)SZ;
- if (*rbufsize < sz) {
- if (!(*rbuf = GDKrealloc(*rbuf, sz)))
+ char *buf = lg->rbuf;
+ if (lg->rbufsize < sz) {
+ if (!(buf = GDKrealloc(lg->rbuf, sz)))
return LOG_ERR;
- *rbufsize = sz;
+ lg->rbuf = buf;
+ lg->rbufsize = sz;
}
- if (mnstr_read(lg->input_log, *rbuf, sz, 1) != 1)
+ if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
return LOG_EOF;
/* handle strings */
- char *t = *rbuf;
+ char *t = buf;
/* chunked */
#define CHUNK_SIZE 1024
char *strings[CHUNK_SIZE];
int cur = 0;
- for(; nr>0 && res == LOG_OK && t < (*rbuf+sz); nr--) {
+ for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
strings[cur++] = t;
if (cur == CHUNK_SIZE && b && BUNappendmulti(b,
strings, cur, true) != GDK_SUCCEED)
res = LOG_ERR;
@@ -368,8 +370,6 @@ log_read_updates(logger *lg, trans *tr,
pnr = nr;
tpe = find_type_nr(lg, type_id);
if (tpe >= 0) {
- size_t rbufsize = 64*1024;
- void * rbuf = GDKmalloc(rbufsize);
BAT *uid = NULL;
BAT *r = NULL;
void *(*rt) (ptr, size_t *, stream *, size_t) =
BATatoms[tpe].atomRead;
@@ -379,17 +379,13 @@ log_read_updates(logger *lg, trans *tr,
if (!lg->flushing && l->flag == LOG_UPDATE) {
uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
if (uid == NULL) {
- GDKfree(rbuf);
return LOG_ERR;
}
}
if (l->flag == LOG_UPDATE_CONST) {
- if (mnstr_readLng(lg->input_log, &offset) != 1) {
- GDKfree(rbuf);
+ if (mnstr_readLng(lg->input_log, &offset) != 1)
return LOG_ERR;
-
- }
if (cands) {
// This const range actually represents a
segment of candidates corresponding to updated bat entries
@@ -413,8 +409,7 @@ log_read_updates(logger *lg, trans *tr,
}
else
res = LOG_ERR;
- }
- else {
+ } else {
assert((*cands)->ttype ==
TYPE_oid);
assert(BATcount(*cands) > 0);
if (BATappend(*cands, dense,
NULL, true) != GDK_SUCCEED)
@@ -424,12 +419,11 @@ log_read_updates(logger *lg, trans *tr,
}
// We have to read the value to update the read
cursor
- size_t tlen = rbufsize;
- void *t = rt(rbuf, &tlen, lg->input_log, 1);
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
if (t == NULL) {
res = LOG_ERR;
}
- GDKfree(rbuf);
return res;
}
}
@@ -439,19 +433,18 @@ log_read_updates(logger *lg, trans *tr,
if (r == NULL) {
if (uid)
BBPreclaim(uid);
- GDKfree(rbuf);
return LOG_ERR;
}
}
if (l->flag == LOG_UPDATE_CONST) {
- size_t tlen = rbufsize;
- void *t = rt(rbuf, &tlen, lg->input_log, 1);
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
if (t == NULL) {
res = LOG_ERR;
} else {
- rbuf = t;
- rbufsize = tlen;
+ lg->rbuf = t;
+ lg->rbufsize = tlen;
for(BUN p = 0; p<(BUN) nr; p++) {
if (r && BUNappend(r, t, true) !=
GDK_SUCCEED)
res = LOG_ERR;
@@ -461,7 +454,6 @@ log_read_updates(logger *lg, trans *tr,
if (mnstr_readLng(lg->input_log, &offset) != 1) {
if (r)
BBPreclaim(r);
- GDKfree(rbuf);
return LOG_ERR;
}
if (tpe == TYPE_msk) {
@@ -471,44 +463,42 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- size_t tlen = rbufsize/sizeof(int);
+ size_t tlen = lg->rbufsize/sizeof(int);
size_t cnt = 0, snr = (size_t)nr;
snr = (snr+31)/32;
assert(tlen);
for (; res == LOG_OK && snr > 0;
snr-=cnt) {
cnt = snr>tlen?tlen:snr;
- if
(!mnstr_readIntArray(lg->input_log, rbuf, cnt))
+ if
(!mnstr_readIntArray(lg->input_log, lg->rbuf, cnt))
res = LOG_ERR;
}
}
} else {
if (!ATOMvarsized(tpe)) {
size_t cnt = 0, snr = (size_t)nr;
- size_t tlen = rbufsize/ATOMsize(tpe),
ntlen = rbufsize;
+ size_t tlen =
lg->rbufsize/ATOMsize(tpe), ntlen = lg->rbufsize;
assert(tlen);
/* read in chunks of max
* BUFSIZE/width rows */
for (; res == LOG_OK && snr > 0;
snr-=cnt) {
cnt = snr>tlen?tlen:snr;
- void *t = rt(rbuf, &ntlen,
lg->input_log, cnt);
+ void *t = rt(lg->rbuf, &ntlen,
lg->input_log, cnt);
if (t == NULL) {
res = LOG_EOF;
break;
}
- assert(t == rbuf);
+ assert(t == lg->rbuf);
if (r && BUNappendmulti(r, t,
cnt, true) != GDK_SUCCEED)
res = LOG_ERR;
}
} else if (tpe == TYPE_str) {
/* efficient string */
- char* cbuf = rbuf;
- res = string_reader(lg, &cbuf,
&rbufsize, r, nr);
- rbuf = cbuf;
+ res = string_reader(lg, r, nr);
} else {
for (; res == LOG_OK && nr > 0; nr--) {
- size_t tlen = rbufsize;
- void *t = rt(rbuf, &tlen,
lg->input_log, 1);
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen,
lg->input_log, 1);
if (t == NULL) {
/* see if failure was
due to
@@ -520,8 +510,8 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- rbuf = t;
- rbufsize = tlen;
+ lg->rbuf = t;
+ lg->rbufsize = tlen;
if (r && BUNappend(r,
t, true) != GDK_SUCCEED)
res = LOG_ERR;
}
@@ -568,13 +558,11 @@ log_read_updates(logger *lg, trans *tr,
}
} else if (tpe == TYPE_str) {
/* efficient string */
- char* cbuf = rbuf;
- res = string_reader(lg, &cbuf, &rbufsize, r,
nr);
- rbuf = cbuf;
+ res = string_reader(lg, r, nr);
} else {
for (; res == LOG_OK && nr > 0; nr--) {
- size_t tlen = rbufsize;
- void *t = rt(rbuf, &tlen,
lg->input_log, 1);
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen,
lg->input_log, 1);
if (t == NULL) {
if (strstr(GDKerrbuf, "malloc")
== NULL)
@@ -582,8 +570,8 @@ log_read_updates(logger *lg, trans *tr,
else
res = LOG_ERR;
} else {
- rbuf = t;
- rbufsize = tlen;
+ lg->rbuf = t;
+ lg->rbufsize = tlen;
if ((r && BUNappend(r, t, true)
!= GDK_SUCCEED))
res = LOG_ERR;
}
@@ -633,7 +621,6 @@ log_read_updates(logger *lg, trans *tr,
else if (uid)
BBPreclaim(uid);
}
- GDKfree(rbuf);
} else {
/* bat missing ERROR or ignore ? currently error. */
res = LOG_ERR;
@@ -1066,7 +1053,6 @@ log_open_output(logger *lg)
{
logged_range *new_range =
(logged_range*)GDKmalloc(sizeof(logged_range));
-
if (!new_range) {
TRC_CRITICAL(GDK, "allocation failure\n");
return GDK_FAIL;
@@ -2139,7 +2125,8 @@ log_load(int debug, const char *fn, cons
MT_lock_destroy(&lg->rotation_lock);
GDKfree(lg->fn);
GDKfree(lg->dir);
- GDKfree(lg->buf);
+ GDKfree(lg->rbuf);
+ GDKfree(lg->wbuf);
GDKfree(lg);
GDKdebug = dbg;
return GDK_FAIL;
@@ -2185,13 +2172,17 @@ log_new(int debug, const char *fn, const
}
lg->fn = GDKstrdup(fn);
lg->dir = GDKstrdup(filename);
- lg->bufsize = 64*1024;
- lg->buf = GDKmalloc(lg->bufsize);
- if (lg->fn == NULL || lg->dir == NULL || lg->buf == NULL) {
+ lg->rbufsize = 64*1024;
+ lg->rbuf = GDKmalloc(lg->rbufsize);
+ lg->wbufsize = 64*1024;
+ lg->wbuf = GDKmalloc(lg->wbufsize);
+ if (lg->fn == NULL || lg->dir == NULL ||
+ lg->rbuf == NULL || lg->wbuf == NULL) {
TRC_CRITICAL(GDK, "strdup failed\n");
GDKfree(lg->fn);
GDKfree(lg->dir);
- GDKfree(lg->buf);
+ GDKfree(lg->rbuf);
+ GDKfree(lg->wbuf);
GDKfree(lg);
return NULL;
}
@@ -2216,7 +2207,7 @@ do_flush_range_cleanup(logger* lg) {
rotation_lock(lg);
logged_range* frange = lg->flush_ranges;
logged_range* first = frange;
-
+
while ( frange->next) {
if (ATOMIC_GET(&frange->refcount) > 1)
break;
@@ -2289,7 +2280,8 @@ log_destroy(logger *lg)
ATOMIC_DESTROY(&lg->nr_flushers);
GDKfree(lg->fn);
GDKfree(lg->dir);
- GDKfree(lg->buf);
+ GDKfree(lg->rbuf);
+ GDKfree(lg->wbuf);
GDKfree(lg);
}
@@ -2529,9 +2521,9 @@ log_constant(logger *lg, int type, ptr v
static gdk_return
string_writer(logger *lg, BAT *b, lng offset, lng nr)
{
- size_t bufsz = lg->bufsize, resize = 0;
+ size_t bufsz = lg->wbufsize, resize = 0;
BUN end = (BUN)(offset + nr);
- char *buf = lg->buf;
+ char *buf = lg->wbuf;
gdk_return res = GDK_SUCCEED;
if (!buf)
@@ -2541,12 +2533,12 @@ string_writer(logger *lg, BAT *b, lng of
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]