Changeset: f1442ffa8506 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/f1442ffa8506
Modified Files:
gdk/gdk_bbp.c
sql/storage/bat/bat_storage.c
sql/test/concurrent/Tests/All
Branch: Sep2022
Log Message:
Merge with Jan2022 branch.
diffs (206 lines):
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -951,7 +951,8 @@ BBPcheckbats(unsigned bbpversion)
if (statb.st_size > (off_t) hfree) {
int fd;
if ((fd = MT_open(path, O_RDWR | O_CLOEXEC |
O_BINARY)) >= 0) {
- (void) ftruncate(fd, hfree);
+ if (ftruncate(fd, hfree) == -1)
+ perror("ftruncate");
(void) close(fd);
}
}
@@ -979,7 +980,8 @@ BBPcheckbats(unsigned bbpversion)
if (statb.st_size > (off_t) hfree) {
int fd;
if ((fd = MT_open(path, O_RDWR | O_CLOEXEC |
O_BINARY)) >= 0) {
- (void) ftruncate(fd, hfree);
+ if (ftruncate(fd, hfree) == -1)
+ perror("ftruncate");
(void) close(fd);
}
}
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -470,13 +470,6 @@ new_segments(sql_trans *tr, size_t cnt)
return n;
}
-static segments*
-dup_segments(segments *s)
-{
- sql_ref_inc(&s->r);
- return s;
-}
-
static int
temp_dup_cs(column_storage *cs, ulng tid, int type)
{
@@ -2332,15 +2325,11 @@ delta_append_val(sql_trans *tr, sql_delt
}
static int
-dup_storage( sql_trans *tr, storage *obat, storage *bat, int temp)
+dup_storage( sql_trans *tr, storage *obat, storage *bat)
{
- if (temp) {
- if (!(bat->segs = new_segments(tr, 0)))
- return LOG_ERR;
- } else {
- bat->segs = dup_segments(obat->segs);
- }
- return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, temp);
+ if (!(bat->segs = new_segments(tr, 0)))
+ return LOG_ERR;
+ return dup_cs(tr, &obat->cs, &bat->cs, TYPE_msk, 1);
}
static int
@@ -2649,40 +2638,55 @@ segments_conflict(sql_trans *tr, segment
return 0;
}
+static int clear_storage(sql_trans *tr, sql_table *t, storage *s);
+
static storage *
bind_del_data(sql_trans *tr, sql_table *t, bool *clear)
{
- storage *obat = ATOMIC_PTR_GET(&t->data);
-
- if (isTempTable(t) && !(obat = temp_tab_timestamp_storage(tr, t)))
- return NULL;
-
- if (obat->cs.ts == tr->tid)
+ storage *obat;
+
+ if (isTempTable(t)) {
+ if (!(obat = temp_tab_timestamp_storage(tr, t)))
+ return NULL;
+
+ assert(obat->cs.ts == tr->tid);
+
+ if (clear && clear_storage(tr, t, obat) != LOG_OK)
+ return NULL;
+
return obat;
- if ((!tr->parent || !tr_version_of_parent(tr, obat->cs.ts)) &&
obat->cs.ts >= TRANSACTION_ID_BASE && !isTempTable(t)) {
- /* abort */
- if (clear)
- *clear = true;
- return NULL;
}
- if (!isTempTable(t) && !clear)
+
+ obat = ATOMIC_PTR_GET(&t->data);
+
+ if (obat->cs.ts != tr->tid)
+ if (!tr->parent || !tr_version_of_parent(tr, obat->cs.ts))
+ if (obat->cs.ts >= TRANSACTION_ID_BASE) {
+ /* abort */
+ if (clear)
+ *clear = true;
+ return NULL;
+ }
+
+ if (!clear)
return obat;
- if (!isTempTable(t) && clear && segments_conflict(tr, obat->segs, 1)) {
+
+ /* remainder is only to handle clear */
+ if (segments_conflict(tr, obat->segs, 1)) {
*clear = true;
return NULL;
}
-
- assert(!isTempTable(t));
if (!(obat = timestamp_storage(tr, ATOMIC_PTR_GET(&t->data))))
return NULL;
storage *bat = ZNEW(storage);
if (!bat)
return NULL;
bat->cs.refcnt = 1;
- if (dup_storage(tr, obat, bat, clear || isTempTable(t) /* for clear and
temp create empty storage */) != LOG_OK) {
+ if (dup_storage(tr, obat, bat) != LOG_OK) {
destroy_storage(bat);
return NULL;
}
+ bat->cs.cleared = true;
bat->cs.ts = tr->tid;
/* only one writer else abort */
bat->next = obat;
@@ -3811,8 +3815,6 @@ clear_del(sql_trans *tr, sql_table *t, i
}
if ((!inTransaction(tr, t) && !in_transaction && isGlobal(t)) ||
(!isNew(t) && isLocalTemp(t)))
trans_add(tr, &t->base, bat, &tc_gc_del, &commit_update_del,
isTempTable(t) || isUnloggedTable(t) ? NULL : &log_update_del);
- if (clear && ok == LOG_OK)
- return clear_storage(tr, t, bat);
if (ok == LOG_ERR)
return BUN_NONE;
if (ok == LOG_CONFLICT)
diff --git a/sql/test/concurrent/Tests/All b/sql/test/concurrent/Tests/All
--- a/sql/test/concurrent/Tests/All
+++ b/sql/test/concurrent/Tests/All
@@ -1,4 +1,5 @@
simple_select
crash_on_concurrent_use.SF-1411926
segments-corruption
+read-segment-after-free
smart-segment-merge
diff --git a/sql/test/concurrent/Tests/read-segment-after-free.SQL.py
b/sql/test/concurrent/Tests/read-segment-after-free.SQL.py
new file mode 100644
--- /dev/null
+++ b/sql/test/concurrent/Tests/read-segment-after-free.SQL.py
@@ -0,0 +1,51 @@
+import os, random, pymonetdb
+from concurrent.futures import ProcessPoolExecutor
+import time
+from pymonetdb.exceptions import OperationalError
+
+init = '''
+ drop table if exists foo;
+ create table foo (c1, c2, c3, c4, c5) AS VALUES
+ (10, 20, 30, 40, 50),
+ (11, 21, 31, 41, 51),
+ (12, 22, 32, 42, 52);
+ '''
+
+query = """
+ truncate foo;
+ insert into foo VALUES
+ (10, 20, 30, 40, 50),
+ (11, 21, 31, 41, 51),
+ (12, 22, 32, 42, 52);
+ """
+
+h = os.getenv('MAPIHOST')
+p = int(os.getenv('MAPIPORT'))
+db = os.getenv('TSTDB')
+
+nr_queries = 1000
+nr_clients = 16
+
+conn = pymonetdb.connect(hostname=h, port=p,database=db, autocommit=True)
+cursor = conn.cursor()
+
+try:
+ cursor.execute(init)
+except Exception as e:
+ print(e)
+ exit(1)
+
+def client(_):
+ conn = pymonetdb.connect(hostname=h, port=p,database=db, autocommit=True)
+ cursor = conn.cursor()
+ cursor.execute("set optimizer = 'minimal_fast';")
+
+ for x in range(0, nr_queries):
+ try:
+ cursor.execute(query)
+ except OperationalError as e:
+ # concurrency conflicts are allowed
+ pass
+
+with ProcessPoolExecutor(nr_clients) as pool:
+ pool.map(client, range(nr_clients))
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]