Changeset: 0d45b03e2bff for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/0d45b03e2bff
Modified Files:
        sql/storage/bat/bat_storage.c
        sql/storage/objectset.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: default
Log Message:

Reapply changeset e9a9d667c7f9


diffs (248 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -4147,6 +4147,9 @@ commit_update_col( sql_trans *tr, sql_ch
        ATOMIC_PTR_TYPE* data = &c->data;
        int type = c->type.type->localtype;
 
+       if (isDeleted(c->t))
+               return LOG_OK;
+
        return commit_update_delta(tr, change, t, base, data, type, commit_ts, 
oldest);
 }
 
@@ -4173,6 +4176,9 @@ commit_update_idx( sql_trans *tr, sql_ch
        ATOMIC_PTR_TYPE* data = &i->data;
        int type = (oid_index(i->type))?TYPE_oid:TYPE_lng;
 
+       if (isDeleted(i->t))
+               return LOG_OK;
+
        return commit_update_delta(tr, change, t, base, data, type, commit_ts, 
oldest);
 }
 
@@ -4213,6 +4219,9 @@ commit_update_del( sql_trans *tr, sql_ch
        sql_table *t = (sql_table*)change->obj;
        storage *dbat = ATOMIC_PTR_GET(&t->data);
 
+       if (isDeleted(t))
+               return ok;
+
        if (t->commit_action == CA_DELETE || t->commit_action == CA_DROP) {
                assert(isTempTable(t));
                if ((ok = clear_storage(tr, t, dbat)) == LOG_OK)
@@ -4410,7 +4419,7 @@ claim_segmentsV2(sql_trans *tr, sql_tabl
 {
        int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
        assert(s->segs);
-       ulng oldest = store_oldest(tr->store);
+       ulng oldest = store_oldest(tr->store, NULL);
        BUN slot = 0;
        size_t total = cnt;
 
@@ -4470,7 +4479,7 @@ claim_segmentsV2(sql_trans *tr, sql_tabl
        if (!locked)
                unlock_table(tr->store, t->base.id);
 
-       if (ok == LOG_OK) {     
+       if (ok == LOG_OK) {
                /* hard to only add this once per transaction (probably want to 
change to once per new segment) */
                if (!in_transaction) {
                        trans_add(tr, &t->base, s, &tc_gc_del, 
&commit_update_del, NOT_TO_BE_LOGGED(t) ? NULL : &log_update_del);
@@ -4499,7 +4508,7 @@ claim_segments(sql_trans *tr, sql_table 
                return claim_segmentsV2(tr, t, s, cnt, offset, offsets, locked);
        int in_transaction = segments_in_transaction(tr, t), ok = LOG_OK;
        assert(s->segs);
-       ulng oldest = store_oldest(tr->store);
+       ulng oldest = store_oldest(tr->store, NULL);
        BUN slot = 0;
        int reused = 0;
 
diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -401,7 +401,7 @@ objectversion_destroy(sqlstore *store, o
                node_destroy(ov->os, store, ov->id_based_head);
        }
 
-       if (os->destroy)
+       if (os->destroy && ov->b)
                os->destroy(store, ov->b);
 
        if (os->temporary && (state & deleted || state & under_destruction || 
state & rollbacked))
@@ -500,6 +500,18 @@ os_rollback(objectversion *ov, sqlstore 
        return LOG_OK;
 }
 
+static void
+ov_destroy_obj_recursive(sqlstore* store, objectversion *ov)
+{
+       if (ov->id_based_older && ov->id_based_older == ov->name_based_older) {
+               ov_destroy_obj_recursive(store, ov->id_based_older);
+       }
+       if (ov->os->destroy && ov->b) {
+               ov->os->destroy(store, ov->b);
+               ov->b = NULL;
+       }
+}
+
 static inline void
 try_to_mark_deleted_for_destruction(sqlstore* store, objectversion *ov)
 {
@@ -525,6 +537,7 @@ try_to_mark_deleted_for_destruction(sqls
                }
 
                ov->ts = store_get_timestamp(store)+1;
+               ov_destroy_obj_recursive(store, ov);
        }
 }
 
@@ -577,6 +590,7 @@ os_cleanup(sqlstore* store, objectversio
                if (ov->ts <= oldest) {
                        // the oldest relevant state is deleted so lets try to 
mark it as destroyed
                        try_to_mark_deleted_for_destruction(store, ov);
+                       return LOG_OK+1;
                }
 
                // Keep it inplace on the cleanup list, either because it is 
now marked for destruction or
@@ -603,14 +617,14 @@ os_cleanup(sqlstore* store, objectversio
 static int
 tc_gc_objectversion(sql_store store, sql_change *change, ulng oldest)
 {
-       assert(!change->handled);
+//     assert(!change->handled);
        objectversion *ov = (objectversion*)change->data;
 
        if (oldest >= TRANSACTION_ID_BASE)
                return 0;
        int res = os_cleanup( (sqlstore*) store, ov, oldest);
        change->handled = (res)?true:false;
-       return res;
+       return res>=0?LOG_OK:LOG_ERR;
 }
 
 static int
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -345,7 +345,7 @@ extern lng store_hot_snapshot_to_stream(
 
 extern ulng store_function_counter(struct sqlstore *store);
 
-extern ulng store_oldest(struct sqlstore *store);
+extern ulng store_oldest(struct sqlstore *store, sql_trans *tr);
 extern ulng store_get_timestamp(struct sqlstore *store);
 extern void store_manager(struct sqlstore *store);
 
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -52,8 +52,15 @@ store_transaction_id(sqlstore *store)
 }
 
 ulng
-store_oldest(sqlstore *store)
-{
+store_oldest(sqlstore *store, sql_trans *tr)
+{
+       if (tr && tr->ts == store->oldest) {
+               sql_session *s = store->active->h->data;
+               if (s->tr == tr && store->active->h->next) {
+                       s = store->active->h->next->data;
+                       return s->tr->ts;
+               }
+       }
        return store->oldest;
 }
 
@@ -2334,7 +2341,7 @@ id_hash_clear_older(sql_hash *h, ulng ol
 }
 
 static void
-store_pending_changes(sqlstore *store, ulng oldest)
+store_pending_changes(sqlstore *store, ulng oldest, sql_trans *tr)
 {
        ulng oldest_changes = store_get_timestamp(store);
        if (!list_empty(store->changes)) { /* lets first cleanup old stuff */
@@ -2346,7 +2353,7 @@ store_pending_changes(sqlstore *store, u
                        if (c->cleanup(store, c, oldest)) {
                                list_remove_node(store->changes, store, n);
                                _DELETE(c);
-                       } else if (c->ts < oldest_changes) {
+                       } else if (!c->handled && c->ts < oldest_changes) {
                                oldest_changes = c->ts;
                        }
                        n = next;
@@ -2356,7 +2363,7 @@ store_pending_changes(sqlstore *store, u
                dep_hash_clear(store->dependencies);
                dep_hash_clear(store->depchanges);
        } else {
-               ulng stoldest = store_oldest(store);
+               ulng stoldest = store_oldest(store, tr);
                id_hash_clear_older(store->dependencies, stoldest);
                id_hash_clear_older(store->depchanges, stoldest);
        }
@@ -2380,7 +2387,7 @@ store_manager(sqlstore *store)
                        store_lock(store);
                        if (ATOMIC_GET(&store->nr_active) == 0) {
                                ulng oldest = store_timestamp(store)+1;
-                               store_pending_changes(store, oldest);
+                               store_pending_changes(store, oldest, NULL);
                        }
                        store_unlock(store);
                        MT_lock_set(&store->flush);
@@ -3664,7 +3671,7 @@ sql_trans_rollback(sql_trans *tr, bool c
                if (!commit_lock)
                        MT_lock_set(&store->commit);
                store_lock(store);
-               ulng oldest = store_oldest(store);
+               ulng oldest = store_oldest(store, tr);
                ulng commit_ts = store_get_timestamp(store); /* use most recent 
timestamp such that we can cleanup savely */
                for(node *n=nl->h; n; n = n->next) {
                        sql_change *c = n->data;
@@ -3673,7 +3680,7 @@ sql_trans_rollback(sql_trans *tr, bool c
                                c->commit(tr, c, 0 /* ie rollback */, oldest);
                        c->ts = commit_ts;
                }
-               store_pending_changes(store, oldest);
+               store_pending_changes(store, oldest, tr);
                for(node *n=nl->h; n; n = n->next) {
                        sql_change *c = n->data;
 
@@ -3695,8 +3702,8 @@ sql_trans_rollback(sql_trans *tr, bool c
                if (!commit_lock)
                        MT_lock_set(&store->commit);
                store_lock(store);
-               ulng oldest = store_oldest(store);
-               store_pending_changes(store, oldest);
+               ulng oldest = store_oldest(store, tr);
+               store_pending_changes(store, oldest, tr);
                store_unlock(store);
                if (!commit_lock)
                        MT_lock_unset(&store->commit);
@@ -4006,14 +4013,14 @@ sql_trans_commit(sql_trans *tr)
                }
                else {
                        commit_ts = store_timestamp(store);
-                       oldest = store_oldest(store);
+                       oldest = store_oldest(store, tr);
                }
                tr->logchanges = 0;
                TRC_DEBUG(SQL_STORE, "Forwarding changes (" ULLFMT ", " ULLFMT 
") -> " ULLFMT "\n", tr->tid, tr->ts, commit_ts);
                /* apply committed changes */
                if (ATOMIC_GET(&store->nr_active) == 1 && !tr->parent)
                        oldest = commit_ts;
-               store_pending_changes(store, oldest);
+               store_pending_changes(store, oldest, tr);
                for(node *n=tr->changes->h; n && ok == LOG_OK; n = n->next) {
                        sql_change *c = n->data;
 
@@ -4073,7 +4080,7 @@ sql_trans_commit(sql_trans *tr)
                MT_lock_set(&store->commit);
                store_lock(store);
                ulng oldest = store_timestamp(store);
-               store_pending_changes(store, oldest);
+               store_pending_changes(store, oldest, tr);
                store_unlock(store);
                MT_lock_unset(&store->commit);
        }
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to