Changeset: 3c7645722037 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/3c7645722037
Modified Files:
        gdk/gdk_logger.c
Branch: Jul2021
Log Message:

delay releasing the bats until after the subcommit


diffs (155 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1441,11 +1441,11 @@ subcommit_list_add(int next, bat *n, BUN
 }
 
 static int
-cleanup_and_swap(logger *lg, const log_bid *bids, lng *lids, lng *cnts, BAT 
*catalog_bid, BAT *catalog_id, BAT *dcatalog, int cleanup)
+cleanup_and_swap(logger *lg, int *r, const log_bid *bids, lng *lids, lng 
*cnts, BAT *catalog_bid, BAT *catalog_id, BAT *dcatalog, int cleanup)
 {
        BAT *nbids, *noids, *ncnts, *nlids, *ndels;
        BUN p, q;
-       int err = 0;
+       int err = 0, rcnt = 0;
 
        oid *poss = Tloc(dcatalog, 0);
        BATloop(dcatalog, p, q) {
@@ -1455,20 +1455,15 @@ cleanup_and_swap(logger *lg, const log_b
                        continue;
 
                if (lids[pos] >= 0) {
-                       if (lg->debug & 1) {
-                               fprintf(stderr, "release %d\n", bids[pos]);
-                               if (BBP_lrefs(bids[pos]) != 2)
-                                       fprintf(stderr, "release %d %d\n", 
bids[pos], BBP_lrefs(bids[pos]));
-                       }
+                       lids[pos] = -1; /* mark as transient */
+                       r[rcnt++] = bids[pos];
 
                        BAT *lb;
 
-                       BBPrelease(bids[pos]);
                        if ((lb = BATdescriptor(bids[pos])) == NULL ||
                                BATmode(lb, true/*transient*/) != GDK_SUCCEED) {
                                TRC_WARNING(GDK, "Failed to set bat(%d) 
transient\n", bids[pos]);
                        }
-                       lids[pos] = -1; /* mark as transient */
                        logbat_destroy(lb);
                }
        }
@@ -1531,9 +1526,9 @@ cleanup_and_swap(logger *lg, const log_b
                logbat_destroy(nlids);
                return -1;
        }
-       BBPrelease(lg->catalog_bid->batCacheid);
-       BBPrelease(lg->catalog_id->batCacheid);
-       BBPrelease(lg->dcatalog->batCacheid);
+       r[rcnt++] = lg->catalog_bid->batCacheid;
+       r[rcnt++] = lg->catalog_id->batCacheid;
+       r[rcnt++] = lg->dcatalog->batCacheid;
 
        logbat_destroy(lg->catalog_bid);
        logbat_destroy(lg->catalog_id);
@@ -1551,7 +1546,7 @@ cleanup_and_swap(logger *lg, const log_b
        lg->cnt = BATcount(lg->catalog_bid);
        lg->deleted -= cleanup;
        assert(lg->deleted == BATcount(lg->dcatalog));
-       return 0;
+       return rcnt;
 }
 
 static gdk_return
@@ -1564,16 +1559,18 @@ bm_subcommit(logger *lg)
        BAT *dcatalog = lg->dcatalog;
        BUN nn = 13 + BATcount(catalog_bid);
        bat *n = GDKmalloc(sizeof(bat) * nn);
+       bat *r = GDKmalloc(sizeof(bat) * nn);
        BUN *sizes = GDKmalloc(sizeof(BUN) * nn);
-       int i = 0;
+       int i = 0, rcnt = 0;
        gdk_return res;
        const log_bid *bids;
        lng *cnts = NULL, *lids = NULL;
        int cleanup = 0;
        lng t0 = 0;
 
-       if (n == NULL || sizes == NULL) {
+       if (n == NULL || r == NULL || sizes == NULL) {
                GDKfree(n);
+               GDKfree(r);
                GDKfree(sizes);
                logger_unlock(lg);
                return GDK_FAIL;
@@ -1605,8 +1602,9 @@ bm_subcommit(logger *lg)
        sizes[i] = BATcount(dcatalog);
        n[i++] = dcatalog->batCacheid;
 
-       if (cleanup && cleanup_and_swap(lg, bids, lids, cnts, catalog_bid, 
catalog_id, dcatalog, cleanup)) {
+       if (cleanup && (rcnt=cleanup_and_swap(lg, r, bids, lids, cnts, 
catalog_bid, catalog_id, dcatalog, cleanup)) < 0) {
                GDKfree(n);
+               GDKfree(r);
                GDKfree(sizes);
                logger_unlock(lg);
                return GDK_FAIL;
@@ -1628,6 +1626,7 @@ bm_subcommit(logger *lg)
                tids = bm_tids(lg->seqs_id, lg->dseqs);
                if (tids == NULL) {
                        GDKfree(n);
+                       GDKfree(r);
                        GDKfree(sizes);
                        logger_unlock(lg);
                        return GDK_FAIL;
@@ -1640,6 +1639,7 @@ bm_subcommit(logger *lg)
                        logbat_destroy(ids);
                        logbat_destroy(vals);
                        GDKfree(n);
+                       GDKfree(r);
                        GDKfree(sizes);
                        logger_unlock(lg);
                        return GDK_FAIL;
@@ -1651,6 +1651,7 @@ bm_subcommit(logger *lg)
                        logbat_destroy(ids);
                        logbat_destroy(vals);
                        GDKfree(n);
+                       GDKfree(r);
                        GDKfree(sizes);
                        logger_unlock(lg);
                        return GDK_FAIL;
@@ -1663,6 +1664,7 @@ bm_subcommit(logger *lg)
                        logbat_destroy(ids);
                        logbat_destroy(vals);
                        GDKfree(n);
+                       GDKfree(r);
                        GDKfree(sizes);
                        logger_unlock(lg);
                        return GDK_FAIL;
@@ -1670,6 +1672,9 @@ bm_subcommit(logger *lg)
                i = subcommit_list_add(i, n, sizes, ids->batCacheid, 
BATcount(ids));
                i = subcommit_list_add(i, n, sizes, vals->batCacheid, 
BATcount(ids));
 
+               r[rcnt++] = lg->seqs_id->batCacheid;
+               r[rcnt++] = lg->seqs_val->batCacheid;
+
                logbat_destroy(lg->seqs_id);
                logbat_destroy(lg->seqs_val);
 
@@ -1688,7 +1693,18 @@ bm_subcommit(logger *lg)
        res = TMsubcommit_list(n, cnts?sizes:NULL, i, lg->saved_id, 
lg->saved_tid);
        if (lg->debug & 1)
                fprintf(stderr, "#subcommit " LLFMT "usec\n", GDKusec() - t0);
+       if (res == GDK_SUCCEED) { /* now cleanup */
+               for(i=0;i<rcnt; i++) {
+                       if (lg->debug & 1) {
+                               fprintf(stderr, "release %d\n", r[i]);
+                               if (BBP_lrefs(r[i]) != 2)
+                                       fprintf(stderr, "release %d %d\n", 
r[i], BBP_lrefs(r[i]));
+                       }
+                       BBPrelease(r[i]);
+               }
+       }
        GDKfree(n);
+       GDKfree(r);
        GDKfree(sizes);
        if (res != GDK_SUCCEED)
                TRC_CRITICAL(GDK, "commit failed\n");
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to