Changeset: ec7d0f9d2e2d for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/ec7d0f9d2e2d Modified Files: clients/Tests/exports.stable.out sql/backends/monet5/sql_cat.c sql/server/rel_exp.c sql/server/rel_optimize_others.c sql/server/rel_psm.c sql/server/rel_select.c sql/server/rel_statistics.c sql/server/rel_unnest.c sql/server/rel_updates.c sql/server/sql_parser.y sql/storage/bat/bat_logger.c sql/storage/bat/bat_storage.c sql/storage/store.c sql/test/BugTracker-2024/Tests/7605_full_3_level_name_support.test sql/test/BugTracker-2025/Tests/7614_join_reordering.test sql/test/BugTracker-2025/Tests/7615_join_reordering_2.test sql/test/BugTracker-2025/Tests/7616_join_reordering_3.test sql/test/SQLancer/Tests/sqlancer10.test testing/sqllogictest.py Branch: nested Log Message:
merged with default diffs (truncated from 21356 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -545,7 +545,7 @@ gdk_return log_bat_group_start(logger *l gdk_return log_bat_persists(logger *lg, BAT *b, log_id id); gdk_return log_bat_transient(logger *lg, log_id id); lng log_changes(logger *lg); -gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt); +gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt, lng total_cnt); logger *log_create(int debug, const char *fn, const char *logdir, int version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata); gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id); void log_destroy(logger *lg); diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c --- a/gdk/gdk_logger.c +++ b/gdk/gdk_logger.c @@ -39,6 +39,7 @@ static gdk_return log_del_bat(logger *lg #define LOG_SEQ 7 #define LOG_CLEAR 8 /* DEPRECATED */ #define LOG_BAT_GROUP 9 +#define LOG_UPDATE_CB 10 #ifdef NATIVE_WIN32 #define getfilepos _ftelli64 @@ -65,6 +66,7 @@ static const char *log_commands[] = { "LOG_SEQ", "", /* LOG_CLEAR IS DEPRECATED */ "LOG_BAT_GROUP", + "LOG_UPDATE_CB", }; typedef struct logaction { @@ -457,6 +459,56 @@ log_read_updates(logger *lg, trans *tr, } return res; } + } else if (l->flag == LOG_UPDATE_CB) { + if (cands) { + bool append = (!lg->flushing && !skip_entry); + if (lg->flushing || skip_entry) { + /* when flushing, we only need the offset and count of the last segment of inserts. */ + assert((*cands)->ttype == TYPE_void); + BATtseqbase(*cands, (oid) 0); + BATsetcount(*cands, (BUN) nr); + } + + size_t snr = (size_t) nr; + BUN total = snr; + + if (append && (*cands)->ttype == TYPE_void) { + BBPreclaim(*cands); + *cands = COLnew(0, TYPE_oid, (BUN) nr, TRANSIENT); + } + oid *c = append?Tloc((*cands), 0):NULL; + while(snr) { + if (mnstr_readLng(lg->input_log, &nr) != 1 || + mnstr_readLng(lg->input_log, &offset) != 1) { + TRC_CRITICAL(GDK, "read failed\n"); + return LOG_EOF; + } + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); + if (t == NULL) { + TRC_CRITICAL(GDK, "read failed\n"); + return LOG_EOF; + } else if (append) { + lg->rbuf = t; + lg->rbufsize = tlen; + for (BUN p = 0; p < (BUN) nr; p++) + *c++ = offset++; + } + snr -= nr; + } + if (append) { + BATsetcount( *cands, total ); + (*cands)->tnonil = true; + (*cands)->tnil = false; + (*cands)->tseqbase = oid_nil; + (*cands)->tkey = true; + (*cands)->tsorted = true; + (*cands)->trevsorted = false; + (*cands)->tnorevsorted = 0; + (*cands)->tmaxpos = (*cands)->tminpos = BUN_NONE; + } + return res; + } } if (!lg->flushing && !skip_entry) { @@ -486,6 +538,58 @@ log_read_updates(logger *lg, trans *tr, } } } + } else if (l->flag == LOG_UPDATE_CB) { + size_t snr = (size_t) nr; + + uid = COLnew(0, TYPE_oid, (BUN) nr, TRANSIENT); + if (r && uid == NULL) { + if (r) + BBPreclaim(r); + return LOG_ERR; + } + oid *c = uid?Tloc(uid, 0):NULL; + BUN total = snr; + while(snr) { + if (mnstr_readLng(lg->input_log, &nr) != 1 || + mnstr_readLng(lg->input_log, &offset) != 1) { + if (r) + BBPreclaim(r); + TRC_CRITICAL(GDK, "read failed\n"); + return LOG_EOF; + } + size_t tlen = lg->rbufsize; + void *t = rt(lg->rbuf, &tlen, lg->input_log, 1); + if (t == NULL) { + TRC_CRITICAL(GDK, "read failed\n"); + res = LOG_EOF; + } else { + lg->rbuf = t; + lg->rbufsize = tlen; + if (r) { + for (BUN p = 0; p < (BUN) nr; p++) { + if (BUNappend(r, t, true) != GDK_SUCCEED) { + TRC_CRITICAL(GDK, "append to bat failed\n"); + res = LOG_ERR; + } + *c++ = offset++; + } + } + } + snr -= nr; + } + if (uid) { + BATsetcount( uid, total ); + uid->tnonil = true; + uid->tnil = false; + uid->tseqbase = oid_nil; + uid->tkey = true; + uid->tsorted = true; + uid->trevsorted = false; + uid->tnorevsorted = 0; + uid->tmaxpos = uid->tminpos = BUN_NONE; + } + offset -= pnr; + /* change into */ } else if (l->flag == LOG_UPDATE_BULK) { if (mnstr_readLng(lg->input_log, &offset) != 1) { if (r) @@ -661,7 +765,10 @@ log_read_updates(logger *lg, trans *tr, tr->changes[tr->nr].type = LOG_UPDATE; } } - if (l->flag == LOG_UPDATE_CONST) { + if (uid && l->flag == LOG_UPDATE_CB) { + assert(!cands); /* TODO: This might change in the future. */ + tr->changes[tr->nr].type = LOG_UPDATE; + } else if (l->flag == LOG_UPDATE_CONST || l->flag == LOG_UPDATE_CB) { assert(!cands); /* TODO: This might change in the future. */ tr->changes[tr->nr].type = LOG_UPDATE_BULK; } @@ -792,7 +899,6 @@ la_bat_updates(logger *lg, logaction *la b->tmaxpos = BUN_NONE; b->tkey = false; b->tsorted = false; - b->tkey = false; if (BUNreplace(b, q, t, true) != GDK_SUCCEED) { logbat_destroy(b); bat_iterator_end(&vi); @@ -1335,6 +1441,7 @@ log_read_transaction(logger *lg, BAT *id } skip_entry = (ids_to_omit && BUNfnd(ids_to_omit, &l.id) != BUN_NONE); switch (l.flag) { + case LOG_UPDATE_CB: case LOG_UPDATE_CONST: case LOG_UPDATE_BULK: case LOG_UPDATE: @@ -1417,6 +1524,7 @@ log_read_transaction(logger *lg, BAT *id case LOG_SEQ: err = log_read_seq(lg, &l); break; + case LOG_UPDATE_CB: case LOG_UPDATE_CONST: case LOG_UPDATE_BULK: case LOG_UPDATE: @@ -2962,16 +3070,55 @@ log_sequence(logger *lg, int seq, lng *i return 0; } -gdk_return -log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt) +static gdk_return +log_constant_bulk(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt, lng total_cnt) { bte tpe = find_type(lg, type); gdk_return ok = GDK_SUCCEED; - logformat l; - lng nr; - l.flag = LOG_UPDATE_CONST; - l.id = id; - nr = cnt; + lng nr = cnt; + + gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite; + + assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR); + if (lg->total_cnt == 0) { + logformat l; + l.flag = LOG_UPDATE_CB; + l.id = id; + if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR || + log_write_format(lg, &l) != GDK_SUCCEED || + !mnstr_writeLng(lg->current->output_log, total_cnt) || + mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) { + ok = GDK_FAIL; + goto bailout; + } + } + lg->total_cnt += cnt; + if (lg->total_cnt == total_cnt) /* This is the last to be logged part of this bat, we can already reset the total_cnt */ + lg->total_cnt = 0; + if (!mnstr_writeLng(lg->current->output_log, cnt) || + !mnstr_writeLng(lg->current->output_log, offset)) { /* offset = -1 indicates bat was logged in parts */ + ok = GDK_FAIL; + goto bailout; + } + + ok = wt(val, lg->current->output_log, 1); + + TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr); + + bailout: + if (ok != GDK_SUCCEED) { + ATOMIC_DEC(&lg->current->refcount); + const char *err = mnstr_peek_error(lg->current->output_log); + TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? err : ""); + } + return ok; +} + +gdk_return +log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt, lng total_cnt) +{ + lng nr = cnt; + gdk_return ok = GDK_SUCCEED; if (LOG_DISABLED(lg) || !nr) { /* logging is switched off */ @@ -2983,6 +3130,14 @@ log_constant(logger *lg, int type, const return ok; } + if (cnt != total_cnt) + return log_constant_bulk(lg, type, val, id, offset, cnt, total_cnt); + + bte tpe = find_type(lg, type); + logformat l; + l.flag = LOG_UPDATE_CONST; + l.id = id; + gdk_return(*wt) (const void *, stream *, size_t) = BATatoms[type].atomWrite; assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR); diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h --- a/gdk/gdk_logger.h +++ b/gdk/gdk_logger.h @@ -55,7 +55,7 @@ gdk_export lng log_changes(logger *lg); gdk_export int log_sequence(logger *lg, int seq, lng *id); /* todo pass the transaction id */ -gdk_export gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt); +gdk_export gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng cnt, lng total_cnt); gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng cnt, lng total_cnt); /* log slice from b */ gdk_export gdk_return log_bat_persists(logger *lg, BAT *b, log_id id); gdk_export gdk_return log_bat_transient(logger *lg, log_id id); diff --git a/monetdb5/modules/atoms/Tests/xml10.maltest b/monetdb5/modules/atoms/Tests/xml10.maltest --- a/monetdb5/modules/atoms/Tests/xml10.maltest +++ b/monetdb5/modules/atoms/Tests/xml10.maltest @@ -65,7 +65,7 @@ statement error io.print(l) statement ok -ag:= xml.subaggr(te,g,e,true) +ag:= aggr.subxmlaggr(te,g,e,true) statement error io.print(ag) @@ -93,7 +93,7 @@ io.print("book construction") "book construction" statement ok -be:= xml.aggr(ae) +be:= aggr.xmlaggr(ae) query T rowsort io.print(be) diff --git a/monetdb5/modules/atoms/batxml.c b/monetdb5/modules/atoms/batxml.c --- a/monetdb5/modules/atoms/batxml.c +++ b/monetdb5/modules/atoms/batxml.c @@ -1183,10 +1183,10 @@ BATXMLgroup(xml *ret, const bat *bid) const char *err = NULL; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org