Changeset: 64ddac7eabcd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/64ddac7eabcd
Modified Files:
sql/include/sql_catalog.h
sql/storage/bat/bat_storage.c
sql/storage/objectset.c
sql/storage/sql_storage.h
sql/storage/store.c
Branch: smart-merge
Log Message:
Implement smarter segment merging
diffs (truncated from 370 to 300 lines):
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -235,7 +235,7 @@ struct os_iter {
/* transaction changes */
typedef int (*tc_valid_fptr) (struct sql_trans *tr, struct sql_change *c/*,
ulng commit_ts, ulng oldest*/);
typedef int (*tc_log_fptr) (struct sql_trans *tr, struct sql_change *c);
/* write changes to the
log */
-typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c,
ulng commit_ts, ulng oldest);/* commit/rollback changes */
+typedef int (*tc_commit_fptr) (struct sql_trans *tr, struct sql_change *c,
ulng commit_ts, ulng oldest, ulng *active, ulng latest);/* commit/rollback
changes */
typedef int (*tc_cleanup_fptr) (sql_store store, struct sql_change *c, ulng
oldest); /* garbage collection, ie cleanup structures when possible */
typedef void (*destroy_fptr)(sql_store store, sql_base *b);
typedef int (*validate_fptr)(struct sql_trans *tr, sql_base *b, int delete);
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -19,15 +19,15 @@
static int log_update_col( sql_trans *tr, sql_change *c);
static int log_update_idx( sql_trans *tr, sql_change *c);
static int log_update_del( sql_trans *tr, sql_change *c);
-static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest);
-static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest);
-static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest);
+static int commit_update_col( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest, ulng *active, ulng latest);
+static int commit_update_idx( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest, ulng *active, ulng latest);
+static int commit_update_del( sql_trans *tr, sql_change *c, ulng commit_ts,
ulng oldest, ulng *active, ulng latest);
static int log_create_col(sql_trans *tr, sql_change *change);
static int log_create_idx(sql_trans *tr, sql_change *change);
static int log_create_del(sql_trans *tr, sql_change *change);
-static int commit_create_col(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest);
-static int commit_create_idx(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest);
-static int commit_create_del(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest);
+static int commit_create_col(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest, ulng *active, ulng latest);
+static int commit_create_idx(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest, ulng *active, ulng latest);
+static int commit_create_del(sql_trans *tr, sql_change *change, ulng
commit_ts, ulng oldest, ulng *active, ulng latest);
static int tc_gc_col( sql_store Store, sql_change *c, ulng oldest);
static int tc_gc_idx( sql_store Store, sql_change *c, ulng oldest);
static int tc_gc_del( sql_store Store, sql_change *c, ulng oldest);
@@ -363,7 +363,7 @@ segments2cs(sql_trans *tr, segments *seg
/* TODO return LOG_OK/ERR */
static void
-merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts,
ulng oldest)
+merge_segments(storage *s, sql_trans *tr, sql_change *change, ulng commit_ts,
ulng oldest, ulng *active, ulng latest)
{
segment *cur = s->segs->h, *seg = NULL;
for (; cur; cur = cur->next) {
@@ -372,22 +372,45 @@ merge_segments(storage *s, sql_trans *tr
cur->oldts = 0;
cur->ts = commit_ts;
}
- if (cur->ts <= oldest && cur->ts < TRANSACTION_ID_BASE) { /*
possibly merge range */
- if (!seg) { /* skip first */
+
+ if (!seg) {
+ /* first segment */
+ seg = cur;
+ }
+ else {
+ /* possible merge since both deleted flags are equal
+ and the timestamp is lesser than the latest one
+ when the active transactions were computed */
+ if (seg->deleted == cur->deleted && seg->ts < latest &&
cur->ts < TRANSACTION_ID_BASE) {
+ int merge = 1;
+ for (int i = 0; active[i] != 0; i++) {
+ if ((active[i] >= seg->ts && active[i]
<= cur->ts)
+ || (active[i] <= seg->ts &&
active[i] >= cur->ts)) {
+ /* cannot safely merge since
there is an active transaction between the segments */
+ merge = 0;
+ break;
+ }
+ }
+ /* merge segments */
+ if (merge) {
+ seg->end = cur->end;
+ seg->next = cur->next;
+ if (cur == s->segs->t)
+ s->segs->t = seg;
+ if (commit_ts == oldest)
+ _DELETE(cur);
+ else
+ mark4destroy(cur, change,
commit_ts);
+ cur = seg;
+ }
+ /* skip merge */
+ else {
+ seg = cur;
+ }
+ }
+ /* skip merge */
+ else {
seg = cur;
- } else if (seg->end == cur->start && seg->deleted ==
cur->deleted) {
- /* merge with previous */
- seg->end = cur->end;
- seg->next = cur->next;
- if (cur == s->segs->t)
- s->segs->t = seg;
- if (commit_ts == oldest)
- _DELETE(cur);
- else
- mark4destroy(cur, change, commit_ts);
- cur = seg;
- } else {
- seg = cur; /* begin of new merge */
}
}
}
@@ -3132,10 +3155,12 @@ log_create_col(sql_trans *tr, sql_change
}
static int
-commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
+commit_create_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest,
ulng *active, ulng latest)
{
int ok = LOG_OK;
(void)oldest;
+ (void)active;
+ (void)latest;
if(!isTempTable(c->t)) {
sql_delta *delta = ATOMIC_PTR_GET(&c->data);
@@ -3153,12 +3178,12 @@ commit_create_col_( sql_trans *tr, sql_c
}
static int
-commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_create_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
sql_column *c = (sql_column*)change->obj;
if (!tr->parent)
c->base.new = 0;
- return commit_create_col_( tr, c, commit_ts, oldest);
+ return commit_create_col_( tr, c, commit_ts, oldest, active, latest);
}
/* will be called for new idx's and when new index columns are created */
@@ -3238,10 +3263,12 @@ log_create_idx(sql_trans *tr, sql_change
}
static int
-commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest)
+commit_create_idx_( sql_trans *tr, sql_idx *i, ulng commit_ts, ulng oldest,
ulng *active, ulng latest)
{
int ok = LOG_OK;
(void)oldest;
+ (void)active;
+ (void)latest;
if(!isTempTable(i->t)) {
sql_delta *delta = ATOMIC_PTR_GET(&i->data);
@@ -3258,12 +3285,12 @@ commit_create_idx_( sql_trans *tr, sql_i
}
static int
-commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_create_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
sql_idx *i = (sql_idx*)change->obj;
if (!tr->parent)
i->base.new = 0;
- return commit_create_idx_(tr, i, commit_ts, oldest);
+ return commit_create_idx_(tr, i, commit_ts, oldest, active, latest);
}
static int
@@ -3475,7 +3502,7 @@ log_create_del(sql_trans *tr, sql_change
}
static int
-commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_create_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
int ok = LOG_OK;
sql_table *t = (sql_table*)change->obj;
@@ -3488,21 +3515,21 @@ commit_create_del( sql_trans *tr, sql_ch
assert(ok == LOG_OK);
if (ok != LOG_OK)
return ok;
- merge_segments(dbat, tr, change, commit_ts, commit_ts/* create
is we are alone */ /*oldest*/);
+ merge_segments(dbat, tr, change, commit_ts, commit_ts, active,
latest/* create is we are alone */ /*oldest*/);
assert(dbat->cs.ts == tr->tid);
dbat->cs.ts = commit_ts;
if (ok == LOG_OK) {
for(node *n = ol_first_node(t->columns); n && ok ==
LOG_OK; n = n->next) {
sql_column *c = n->data;
- ok = commit_create_col_(tr, c, commit_ts,
oldest);
+ ok = commit_create_col_(tr, c, commit_ts,
oldest, active, latest);
}
if (t->idxs) {
for(node *n = ol_first_node(t->idxs); n && ok
== LOG_OK; n = n->next) {
sql_idx *i = n->data;
if (ATOMIC_PTR_GET(&i->data))
- ok = commit_create_idx_(tr, i,
commit_ts, oldest);
+ ok = commit_create_idx_(tr, i,
commit_ts, oldest, active, latest);
}
}
if (!tr->parent)
@@ -3640,12 +3667,14 @@ log_destroy_del(sql_trans *tr, sql_chang
}
static int
-commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_destroy_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
(void)tr;
(void)change;
(void)commit_ts;
(void)oldest;
+ (void)active;
+ (void)latest;
return 0;
}
@@ -4123,12 +4152,14 @@ log_update_col( sql_trans *tr, sql_chang
}
static int
-commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest)
+commit_update_col_( sql_trans *tr, sql_column *c, ulng commit_ts, ulng oldest,
ulng *active, ulng latest)
{
int ok = LOG_OK;
sql_delta *delta = ATOMIC_PTR_GET(&c->data);
(void)oldest;
+ (void)active;
+ (void)latest;
if (isTempTable(c->t)) {
if (commit_ts) { /* commit */
if (c->t->commit_action == CA_COMMIT ||
c->t->commit_action == CA_PRESERVE) {
@@ -4182,14 +4213,14 @@ tc_gc_rollbacked_storage( sql_store Stor
static int
-commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_update_col( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
int ok = LOG_OK;
sql_column *c = (sql_column*)change->obj;
sql_delta *delta = ATOMIC_PTR_GET(&c->data);
if (isTempTable(c->t))
- return commit_update_col_(tr, c, commit_ts, oldest);
+ return commit_update_col_(tr, c, commit_ts, oldest, active,
latest);
if (commit_ts)
delta->cs.ts = commit_ts;
if (!commit_ts) { /* rollback */
@@ -4263,11 +4294,13 @@ commit_update_idx_( sql_trans *tr, sql_i
}
static int
-commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_update_idx( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
int ok = LOG_OK;
sql_idx *i = (sql_idx*)change->obj;
sql_delta *delta = ATOMIC_PTR_GET(&i->data);
+ (void)active;
+ (void)latest;
if (isTempTable(i->t))
return commit_update_idx_( tr, i, commit_ts, oldest);
@@ -4347,7 +4380,7 @@ commit_storage(sql_trans *tr, storage *d
}
static int
-commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest)
+commit_update_del( sql_trans *tr, sql_change *change, ulng commit_ts, ulng
oldest, ulng *active, ulng latest)
{
int ok = LOG_OK;
sql_table *t = (sql_table*)change->obj;
@@ -4400,11 +4433,11 @@ commit_update_del( sql_trans *tr, sql_ch
ok = segments2cs(tr, dbat->segs, &dbat->cs);
assert(ok == LOG_OK);
if (ok == LOG_OK)
- merge_segments(dbat, tr, change, commit_ts, oldest);
+ merge_segments(dbat, tr, change, commit_ts, oldest,
active, latest);
if (ok == LOG_OK && dbat == d && oldest == commit_ts)
ok = merge_storage(dbat);
} else if (ok == LOG_OK && tr->parent) {/* cleanup older save points */
- merge_segments(dbat, tr, change, commit_ts, oldest);
+ merge_segments(dbat, tr, change, commit_ts, oldest, active,
latest);
ATOMIC_PTR_SET(&t->data, savepoint_commit_storage(dbat,
commit_ts));
}
unlock_table(tr->store, t->base.id);
diff --git a/sql/storage/objectset.c b/sql/storage/objectset.c
--- a/sql/storage/objectset.c
+++ b/sql/storage/objectset.c
@@ -607,7 +607,7 @@ tc_gc_objectversion(sql_store store, sql
}
static int
-tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts,
ulng oldest)
+tc_commit_objectversion(sql_trans *tr, sql_change *change, ulng commit_ts,
ulng oldest, ulng *active_, ulng latest)
{
objectversion *ov = (objectversion*)change->data;
if (commit_ts) {
@@ -615,6 +615,8 @@ tc_commit_objectversion(sql_trans *tr, s
ov->ts = commit_ts;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]