Changeset: 51b61fd240e0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/51b61fd240e0
Modified Files:
        gdk/gdk_logger.c
Branch: Dec2025
Log Message:

Use private allocator during WAL processing.


diffs (177 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -87,6 +87,7 @@ typedef struct trans {
        logaction *changes;
 
        struct trans *tr;
+       allocator_state ta_state;
 } trans;
 
 typedef struct logformat_t {
@@ -1042,22 +1043,25 @@ log_write_new_types(logger *lg, FILE *fp
 #define TR_SIZE                1024
 
 static trans *
-tr_create(trans *tr, int tid)
+tr_create(allocator *ta, trans *tr, int tid)
 {
-       trans *ntr = GDKmalloc(sizeof(trans));
-
-       if (ntr == NULL)
-               return NULL;
-       ntr->tid = tid;
-       ntr->sz = TR_SIZE;
-       ntr->nr = 0;
-       ntr->changes = GDKmalloc(sizeof(logaction) * TR_SIZE);
-       if (ntr->changes == NULL) {
-               GDKfree(ntr);
-               return NULL;
+       if (tr)
+               tr->ta_state = ma_open(ta);
+       trans *ntr = ma_alloc(ta, sizeof(trans));
+
+       if (ntr != NULL) {
+               ntr->tid = tid;
+               ntr->sz = TR_SIZE;
+               ntr->nr = 0;
+               ntr->changes = ma_alloc(ta, sizeof(logaction) * TR_SIZE);
+               if (ntr->changes != NULL) {
+                       ntr->tr = tr;
+                       return ntr;
+               }
        }
-       ntr->tr = tr;
-       return ntr;
+       if (tr)
+               ma_close(&tr->ta_state);
+       return NULL;
 }
 
 static gdk_return
@@ -1098,8 +1102,12 @@ tr_grow(trans *tr)
 {
        if (tr->nr == tr->sz) {
                logaction *changes;
+               int sz = tr->sz;
                tr->sz <<= 1;
-               changes = GDKrealloc(tr->changes, tr->sz * sizeof(logaction));
+               changes = ma_realloc(MT_thread_getallocator(),
+                                    tr->changes,
+                                    sz * sizeof(logaction),
+                                    tr->sz * sizeof(logaction));
                if (changes == NULL)
                        return GDK_FAIL;
                tr->changes = changes;
@@ -1113,9 +1121,8 @@ static trans *
 tr_destroy(trans *tr)
 {
        trans *r = tr->tr;
-
-       GDKfree(tr->changes);
-       GDKfree(tr);
+       if (r)
+               ma_close(&r->ta_state);
        return r;
 }
 
@@ -1401,6 +1408,8 @@ log_read_transaction(logger *lg, BAT *id
        ATOMIC_BASE_TYPE dbg = ATOMIC_GET(&GDKdebug);
        time_t t0 = 0;
        size_t fs = 0;
+       allocator *ta = MT_thread_getallocator();
+       allocator_state ta_state = ma_open(ta);
 
        (void) maxupdated;      /* only used inside assert() */
 
@@ -1509,7 +1518,7 @@ log_read_transaction(logger *lg, BAT *id
                        assert(!lg->flushing || l.id <= lg->tid);
                        if (!lg->flushing && l.id > lg->tid)
                                lg->tid = l.id; /* should only happen during 
initialization */
-                       if ((tr = tr_create(tr, l.id)) == NULL) {
+                       if ((tr = tr_create(ta, tr, l.id)) == NULL) {
                                TRC_CRITICAL(GDK, "memory allocation failed\n");
                                err = LOG_ERR;
                                break;
@@ -1587,6 +1596,7 @@ log_read_transaction(logger *lg, BAT *id
                TRC_WARNING(GDK, "aborting transaction\n");
                tr = tr_abort(lg, tr);
        }
+       ma_close(&ta_state);
        if (!lg->flushing)
                ATOMIC_SET(&GDKdebug, dbg);
 
@@ -2935,6 +2945,8 @@ log_flush(logger *lg, ulng ts)
        log_return res = LOG_OK;
        ulng cid = olid;
        assert(lid <= lgid);
+       allocator *ta = MT_thread_getallocator();
+       allocator_state ta_state = ma_open(ta);
        uint32_t *updated = NULL;
        BUN nupdated = 0;
        size_t allocated = 0;
@@ -2943,23 +2955,23 @@ log_flush(logger *lg, ulng ts)
                        char filename[MAXPATH];
                        char id[32];
                        if (snprintf(id, sizeof(id), ULLFMT, cid + 1) >= (int) 
sizeof(id)) {
-                               GDKfree(updated);
+                               ma_close(&ta_state);
                                TRC_CRITICAL(GDK, "log_id filename is too 
large\n");
                                return GDK_FAIL;
                        }
                        if (GDKfilepath(filename, sizeof(filename), 
BBPselectfarm(PERSISTENT, 0, offheap), lg->dir, LOGFILE, id) != GDK_SUCCEED) {
-                               GDKfree(updated);
+                               ma_close(&ta_state);
                                return GDK_FAIL;
                        }
                        if (strlen(filename) >= FILENAME_MAX) {
-                               GDKfree(updated);
+                               ma_close(&ta_state);
                                TRC_CRITICAL(GDK, "Logger filename path is too 
large\n");
                                return GDK_FAIL;
                        }
 
                        bool filemissing = false;
                        if (log_open_input(lg, filename, &filemissing) != 
GDK_SUCCEED) {
-                               GDKfree(updated);
+                               ma_close(&ta_state);
                                return GDK_FAIL;
                        }
                }
@@ -2970,22 +2982,22 @@ log_flush(logger *lg, ulng ts)
                        allocated = ((nupdated + 31) & ~31) / 8;
                        if (allocated == 0)
                                allocated = 4;
-                       updated = GDKzalloc(allocated);
+                       updated = ma_zalloc(ta, allocated);
                        if (updated == NULL) {
                                log_unlock(lg);
+                               ma_close(&ta_state);
                                return GDK_FAIL;
                        }
                } else if (nupdated < BATcount(lg->catalog_id)) {
                        BUN n = BATcount(lg->catalog_id);
                        size_t a = ((n + 31) & ~31) / 8;
                        if (a > allocated) {
-                               uint32_t *p = GDKrealloc(updated, a);
-                               if (p == NULL) {
-                                       GDKfree(updated);
+                               updated = ma_realloc(ta, updated, allocated, a);
+                               if (updated == NULL) {
+                                       ma_close(&ta_state);
                                        log_unlock(lg);
                                        return GDK_FAIL;
                                }
-                               updated = p;
                                memset(updated + allocated / 4, 0, a - 
allocated);
                                allocated = a;
                        }
@@ -3022,7 +3034,7 @@ log_flush(logger *lg, ulng ts)
                if (res == LOG_OK)
                        log_cleanup_range(lg, lg->saved_id);
        }
-       GDKfree(updated);
+       ma_close(&ta_state);
        return res == LOG_ERR ? GDK_FAIL : GDK_SUCCEED;
 }
 
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to