Changeset: ec7d0f9d2e2d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ec7d0f9d2e2d
Modified Files:
clients/Tests/exports.stable.out
sql/backends/monet5/sql_cat.c
sql/server/rel_exp.c
sql/server/rel_optimize_others.c
sql/server/rel_psm.c
sql/server/rel_select.c
sql/server/rel_statistics.c
sql/server/rel_unnest.c
sql/server/rel_updates.c
sql/server/sql_parser.y
sql/storage/bat/bat_logger.c
sql/storage/bat/bat_storage.c
sql/storage/store.c
sql/test/BugTracker-2024/Tests/7605_full_3_level_name_support.test
sql/test/BugTracker-2025/Tests/7614_join_reordering.test
sql/test/BugTracker-2025/Tests/7615_join_reordering_2.test
sql/test/BugTracker-2025/Tests/7616_join_reordering_3.test
sql/test/SQLancer/Tests/sqlancer10.test
testing/sqllogictest.py
Branch: nested
Log Message:
merged with default
diffs (truncated from 21356 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -545,7 +545,7 @@ gdk_return log_bat_group_start(logger *l
gdk_return log_bat_persists(logger *lg, BAT *b, log_id id);
gdk_return log_bat_transient(logger *lg, log_id id);
lng log_changes(logger *lg);
-gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng
offset, lng cnt);
+gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng
offset, lng cnt, lng total_cnt);
logger *log_create(int debug, const char *fn, const char *logdir, int version,
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata);
gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id);
void log_destroy(logger *lg);
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -39,6 +39,7 @@ static gdk_return log_del_bat(logger *lg
#define LOG_SEQ 7
#define LOG_CLEAR 8 /* DEPRECATED */
#define LOG_BAT_GROUP 9
+#define LOG_UPDATE_CB 10
#ifdef NATIVE_WIN32
#define getfilepos _ftelli64
@@ -65,6 +66,7 @@ static const char *log_commands[] = {
"LOG_SEQ",
"", /* LOG_CLEAR IS DEPRECATED */
"LOG_BAT_GROUP",
+ "LOG_UPDATE_CB",
};
typedef struct logaction {
@@ -457,6 +459,56 @@ log_read_updates(logger *lg, trans *tr,
}
return res;
}
+ } else if (l->flag == LOG_UPDATE_CB) {
+ if (cands) {
+ bool append = (!lg->flushing && !skip_entry);
+ if (lg->flushing || skip_entry) {
+ /* when flushing, we only need the
offset and count of the last segment of inserts. */
+ assert((*cands)->ttype == TYPE_void);
+ BATtseqbase(*cands, (oid) 0);
+ BATsetcount(*cands, (BUN) nr);
+ }
+
+ size_t snr = (size_t) nr;
+ BUN total = snr;
+
+ if (append && (*cands)->ttype == TYPE_void) {
+ BBPreclaim(*cands);
+ *cands = COLnew(0, TYPE_oid, (BUN) nr,
TRANSIENT);
+ }
+ oid *c = append?Tloc((*cands), 0):NULL;
+ while(snr) {
+ if (mnstr_readLng(lg->input_log, &nr)
!= 1 ||
+ mnstr_readLng(lg->input_log,
&offset) != 1) {
+ TRC_CRITICAL(GDK, "read
failed\n");
+ return LOG_EOF;
+ }
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen,
lg->input_log, 1);
+ if (t == NULL) {
+ TRC_CRITICAL(GDK, "read
failed\n");
+ return LOG_EOF;
+ } else if (append) {
+ lg->rbuf = t;
+ lg->rbufsize = tlen;
+ for (BUN p = 0; p < (BUN) nr;
p++)
+ *c++ = offset++;
+ }
+ snr -= nr;
+ }
+ if (append) {
+ BATsetcount( *cands, total );
+ (*cands)->tnonil = true;
+ (*cands)->tnil = false;
+ (*cands)->tseqbase = oid_nil;
+ (*cands)->tkey = true;
+ (*cands)->tsorted = true;
+ (*cands)->trevsorted = false;
+ (*cands)->tnorevsorted = 0;
+ (*cands)->tmaxpos = (*cands)->tminpos =
BUN_NONE;
+ }
+ return res;
+ }
}
if (!lg->flushing && !skip_entry) {
@@ -486,6 +538,58 @@ log_read_updates(logger *lg, trans *tr,
}
}
}
+ } else if (l->flag == LOG_UPDATE_CB) {
+ size_t snr = (size_t) nr;
+
+ uid = COLnew(0, TYPE_oid, (BUN) nr, TRANSIENT);
+ if (r && uid == NULL) {
+ if (r)
+ BBPreclaim(r);
+ return LOG_ERR;
+ }
+ oid *c = uid?Tloc(uid, 0):NULL;
+ BUN total = snr;
+ while(snr) {
+ if (mnstr_readLng(lg->input_log, &nr) != 1 ||
+ mnstr_readLng(lg->input_log, &offset) != 1)
{
+ if (r)
+ BBPreclaim(r);
+ TRC_CRITICAL(GDK, "read failed\n");
+ return LOG_EOF;
+ }
+ size_t tlen = lg->rbufsize;
+ void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
+ if (t == NULL) {
+ TRC_CRITICAL(GDK, "read failed\n");
+ res = LOG_EOF;
+ } else {
+ lg->rbuf = t;
+ lg->rbufsize = tlen;
+ if (r) {
+ for (BUN p = 0; p < (BUN) nr;
p++) {
+ if (BUNappend(r, t,
true) != GDK_SUCCEED) {
+
TRC_CRITICAL(GDK, "append to bat failed\n");
+ res = LOG_ERR;
+ }
+ *c++ = offset++;
+ }
+ }
+ }
+ snr -= nr;
+ }
+ if (uid) {
+ BATsetcount( uid, total );
+ uid->tnonil = true;
+ uid->tnil = false;
+ uid->tseqbase = oid_nil;
+ uid->tkey = true;
+ uid->tsorted = true;
+ uid->trevsorted = false;
+ uid->tnorevsorted = 0;
+ uid->tmaxpos = uid->tminpos = BUN_NONE;
+ }
+ offset -= pnr;
+ /* change into */
} else if (l->flag == LOG_UPDATE_BULK) {
if (mnstr_readLng(lg->input_log, &offset) != 1) {
if (r)
@@ -661,7 +765,10 @@ log_read_updates(logger *lg, trans *tr,
tr->changes[tr->nr].type =
LOG_UPDATE;
}
}
- if (l->flag == LOG_UPDATE_CONST) {
+ if (uid && l->flag == LOG_UPDATE_CB) {
+ assert(!cands); /* TODO: This might
change in the future. */
+ tr->changes[tr->nr].type = LOG_UPDATE;
+ } else if (l->flag == LOG_UPDATE_CONST ||
l->flag == LOG_UPDATE_CB) {
assert(!cands); /* TODO: This might
change in the future. */
tr->changes[tr->nr].type =
LOG_UPDATE_BULK;
}
@@ -792,7 +899,6 @@ la_bat_updates(logger *lg, logaction *la
b->tmaxpos = BUN_NONE;
b->tkey = false;
b->tsorted = false;
- b->tkey = false;
if (BUNreplace(b, q, t, true)
!= GDK_SUCCEED) {
logbat_destroy(b);
bat_iterator_end(&vi);
@@ -1335,6 +1441,7 @@ log_read_transaction(logger *lg, BAT *id
}
skip_entry = (ids_to_omit && BUNfnd(ids_to_omit, &l.id) !=
BUN_NONE);
switch (l.flag) {
+ case LOG_UPDATE_CB:
case LOG_UPDATE_CONST:
case LOG_UPDATE_BULK:
case LOG_UPDATE:
@@ -1417,6 +1524,7 @@ log_read_transaction(logger *lg, BAT *id
case LOG_SEQ:
err = log_read_seq(lg, &l);
break;
+ case LOG_UPDATE_CB:
case LOG_UPDATE_CONST:
case LOG_UPDATE_BULK:
case LOG_UPDATE:
@@ -2962,16 +3070,55 @@ log_sequence(logger *lg, int seq, lng *i
return 0;
}
-gdk_return
-log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng
cnt)
+static gdk_return
+log_constant_bulk(logger *lg, int type, const void *val, log_id id, lng
offset, lng cnt, lng total_cnt)
{
bte tpe = find_type(lg, type);
gdk_return ok = GDK_SUCCEED;
- logformat l;
- lng nr;
- l.flag = LOG_UPDATE_CONST;
- l.id = id;
- nr = cnt;
+ lng nr = cnt;
+
+ gdk_return(*wt) (const void *, stream *, size_t) =
BATatoms[type].atomWrite;
+
+ assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+ if (lg->total_cnt == 0) {
+ logformat l;
+ l.flag = LOG_UPDATE_CB;
+ l.id = id;
+ if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+ log_write_format(lg, &l) != GDK_SUCCEED ||
+ !mnstr_writeLng(lg->current->output_log, total_cnt) ||
+ mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
+ ok = GDK_FAIL;
+ goto bailout;
+ }
+ }
+ lg->total_cnt += cnt;
+ if (lg->total_cnt == total_cnt) /* This is the last to be logged part
of this bat, we can already reset the total_cnt */
+ lg->total_cnt = 0;
+ if (!mnstr_writeLng(lg->current->output_log, cnt) ||
+ !mnstr_writeLng(lg->current->output_log, offset)) { /* offset = -1
indicates bat was logged in parts */
+ ok = GDK_FAIL;
+ goto bailout;
+ }
+
+ ok = wt(val, lg->current->output_log, 1);
+
+ TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
+
+ bailout:
+ if (ok != GDK_SUCCEED) {
+ ATOMIC_DEC(&lg->current->refcount);
+ const char *err = mnstr_peek_error(lg->current->output_log);
+ TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ?
err : "");
+ }
+ return ok;
+}
+
+gdk_return
+log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng
cnt, lng total_cnt)
+{
+ lng nr = cnt;
+ gdk_return ok = GDK_SUCCEED;
if (LOG_DISABLED(lg) || !nr) {
/* logging is switched off */
@@ -2983,6 +3130,14 @@ log_constant(logger *lg, int type, const
return ok;
}
+ if (cnt != total_cnt)
+ return log_constant_bulk(lg, type, val, id, offset, cnt,
total_cnt);
+
+ bte tpe = find_type(lg, type);
+ logformat l;
+ l.flag = LOG_UPDATE_CONST;
+ l.id = id;
+
gdk_return(*wt) (const void *, stream *, size_t) =
BATatoms[type].atomWrite;
assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -55,7 +55,7 @@ gdk_export lng log_changes(logger *lg);
gdk_export int log_sequence(logger *lg, int seq, lng *id);
/* todo pass the transaction id */
-gdk_export gdk_return log_constant(logger *lg, int type, const void *val,
log_id id, lng offset, lng cnt);
+gdk_export gdk_return log_constant(logger *lg, int type, const void *val,
log_id id, lng offset, lng cnt, lng total_cnt);
gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng
cnt, lng total_cnt); /* log slice from b */
gdk_export gdk_return log_bat_persists(logger *lg, BAT *b, log_id id);
gdk_export gdk_return log_bat_transient(logger *lg, log_id id);
diff --git a/monetdb5/modules/atoms/Tests/xml10.maltest
b/monetdb5/modules/atoms/Tests/xml10.maltest
--- a/monetdb5/modules/atoms/Tests/xml10.maltest
+++ b/monetdb5/modules/atoms/Tests/xml10.maltest
@@ -65,7 +65,7 @@ statement error
io.print(l)
statement ok
-ag:= xml.subaggr(te,g,e,true)
+ag:= aggr.subxmlaggr(te,g,e,true)
statement error
io.print(ag)
@@ -93,7 +93,7 @@ io.print("book construction")
"book construction"
statement ok
-be:= xml.aggr(ae)
+be:= aggr.xmlaggr(ae)
query T rowsort
io.print(be)
diff --git a/monetdb5/modules/atoms/batxml.c b/monetdb5/modules/atoms/batxml.c
--- a/monetdb5/modules/atoms/batxml.c
+++ b/monetdb5/modules/atoms/batxml.c
@@ -1183,10 +1183,10 @@ BATXMLgroup(xml *ret, const bat *bid)
const char *err = NULL;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]