Changeset: ec7d0f9d2e2d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ec7d0f9d2e2d
Modified Files:
        clients/Tests/exports.stable.out
        sql/backends/monet5/sql_cat.c
        sql/server/rel_exp.c
        sql/server/rel_optimize_others.c
        sql/server/rel_psm.c
        sql/server/rel_select.c
        sql/server/rel_statistics.c
        sql/server/rel_unnest.c
        sql/server/rel_updates.c
        sql/server/sql_parser.y
        sql/storage/bat/bat_logger.c
        sql/storage/bat/bat_storage.c
        sql/storage/store.c
        sql/test/BugTracker-2024/Tests/7605_full_3_level_name_support.test
        sql/test/BugTracker-2025/Tests/7614_join_reordering.test
        sql/test/BugTracker-2025/Tests/7615_join_reordering_2.test
        sql/test/BugTracker-2025/Tests/7616_join_reordering_3.test
        sql/test/SQLancer/Tests/sqlancer10.test
        testing/sqllogictest.py
Branch: nested
Log Message:

merged with default


diffs (truncated from 21356 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -545,7 +545,7 @@ gdk_return log_bat_group_start(logger *l
 gdk_return log_bat_persists(logger *lg, BAT *b, log_id id);
 gdk_return log_bat_transient(logger *lg, log_id id);
 lng log_changes(logger *lg);
-gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng 
offset, lng cnt);
+gdk_return log_constant(logger *lg, int type, const void *val, log_id id, lng 
offset, lng cnt, lng total_cnt);
 logger *log_create(int debug, const char *fn, const char *logdir, int version, 
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp, void *funcdata);
 gdk_return log_delta(logger *lg, BAT *uid, BAT *uval, log_id id);
 void log_destroy(logger *lg);
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -39,6 +39,7 @@ static gdk_return log_del_bat(logger *lg
 #define LOG_SEQ                7
 #define LOG_CLEAR      8       /* DEPRECATED */
 #define LOG_BAT_GROUP  9
+#define LOG_UPDATE_CB  10
 
 #ifdef NATIVE_WIN32
 #define getfilepos _ftelli64
@@ -65,6 +66,7 @@ static const char *log_commands[] = {
        "LOG_SEQ",
        "",                     /* LOG_CLEAR IS DEPRECATED */
        "LOG_BAT_GROUP",
+       "LOG_UPDATE_CB",
 };
 
 typedef struct logaction {
@@ -457,6 +459,56 @@ log_read_updates(logger *lg, trans *tr, 
                                }
                                return res;
                        }
+               } else if (l->flag == LOG_UPDATE_CB) {
+                       if (cands) {
+                               bool append = (!lg->flushing && !skip_entry);
+                               if (lg->flushing || skip_entry) {
+                                       /* when flushing, we only need the 
offset and count of the last segment of inserts. */
+                                       assert((*cands)->ttype == TYPE_void);
+                                       BATtseqbase(*cands, (oid) 0);
+                                       BATsetcount(*cands, (BUN) nr);
+                               }
+
+                               size_t snr = (size_t) nr;
+                               BUN total = snr;
+
+                               if (append && (*cands)->ttype == TYPE_void) {
+                                       BBPreclaim(*cands);
+                                       *cands = COLnew(0, TYPE_oid, (BUN) nr, 
TRANSIENT);
+                               }
+                               oid *c = append?Tloc((*cands), 0):NULL;
+                               while(snr) {
+                                       if (mnstr_readLng(lg->input_log, &nr) 
!= 1 ||
+                                           mnstr_readLng(lg->input_log, 
&offset) != 1) {
+                                               TRC_CRITICAL(GDK, "read 
failed\n");
+                                               return LOG_EOF;
+                                       }
+                                       size_t tlen = lg->rbufsize;
+                                       void *t = rt(lg->rbuf, &tlen, 
lg->input_log, 1);
+                                       if (t == NULL) {
+                                               TRC_CRITICAL(GDK, "read 
failed\n");
+                                               return LOG_EOF;
+                                       } else if (append) {
+                                               lg->rbuf = t;
+                                               lg->rbufsize = tlen;
+                                               for (BUN p = 0; p < (BUN) nr; 
p++)
+                                                       *c++ = offset++;
+                                       }
+                                       snr -= nr;
+                               }
+                               if (append) {
+                                       BATsetcount( *cands, total );
+                                       (*cands)->tnonil = true;
+                                       (*cands)->tnil = false;
+                                       (*cands)->tseqbase = oid_nil;
+                                       (*cands)->tkey = true;
+                                       (*cands)->tsorted = true;
+                                       (*cands)->trevsorted = false;
+                                       (*cands)->tnorevsorted = 0;
+                                       (*cands)->tmaxpos = (*cands)->tminpos = 
BUN_NONE;
+                               }
+                               return res;
+                       }
                }
 
                if (!lg->flushing && !skip_entry) {
@@ -486,6 +538,58 @@ log_read_updates(logger *lg, trans *tr, 
                                        }
                                }
                        }
+               } else if (l->flag == LOG_UPDATE_CB) {
+                       size_t snr = (size_t) nr;
+
+                       uid = COLnew(0, TYPE_oid, (BUN) nr, TRANSIENT);
+                       if (r && uid == NULL) {
+                               if (r)
+                                       BBPreclaim(r);
+                               return LOG_ERR;
+                       }
+                       oid *c = uid?Tloc(uid, 0):NULL;
+                       BUN total = snr;
+                       while(snr) {
+                               if (mnstr_readLng(lg->input_log, &nr) != 1 ||
+                                   mnstr_readLng(lg->input_log, &offset) != 1) 
{
+                                       if (r)
+                                               BBPreclaim(r);
+                                       TRC_CRITICAL(GDK, "read failed\n");
+                                       return LOG_EOF;
+                               }
+                               size_t tlen = lg->rbufsize;
+                               void *t = rt(lg->rbuf, &tlen, lg->input_log, 1);
+                               if (t == NULL) {
+                                       TRC_CRITICAL(GDK, "read failed\n");
+                                       res = LOG_EOF;
+                               } else {
+                                       lg->rbuf = t;
+                                       lg->rbufsize = tlen;
+                                       if (r) {
+                                               for (BUN p = 0; p < (BUN) nr; 
p++) {
+                                                       if (BUNappend(r, t, 
true) != GDK_SUCCEED) {
+                                                               
TRC_CRITICAL(GDK, "append to bat failed\n");
+                                                               res = LOG_ERR;
+                                                       }
+                                                       *c++ = offset++;
+                                               }
+                                       }
+                               }
+                               snr -= nr;
+                       }
+                       if (uid) {
+                               BATsetcount( uid, total );
+                               uid->tnonil = true;
+                               uid->tnil = false;
+                               uid->tseqbase = oid_nil;
+                               uid->tkey = true;
+                               uid->tsorted = true;
+                               uid->trevsorted = false;
+                               uid->tnorevsorted = 0;
+                               uid->tmaxpos = uid->tminpos = BUN_NONE;
+                       }
+                       offset -= pnr;
+                       /* change into */
                } else if (l->flag == LOG_UPDATE_BULK) {
                        if (mnstr_readLng(lg->input_log, &offset) != 1) {
                                if (r)
@@ -661,7 +765,10 @@ log_read_updates(logger *lg, trans *tr, 
                                                tr->changes[tr->nr].type = 
LOG_UPDATE;
                                        }
                                }
-                               if (l->flag == LOG_UPDATE_CONST) {
+                               if (uid && l->flag == LOG_UPDATE_CB) {
+                                       assert(!cands); /* TODO: This might 
change in the future. */
+                                       tr->changes[tr->nr].type = LOG_UPDATE;
+                               } else if (l->flag == LOG_UPDATE_CONST || 
l->flag == LOG_UPDATE_CB) {
                                        assert(!cands); /* TODO: This might 
change in the future. */
                                        tr->changes[tr->nr].type = 
LOG_UPDATE_BULK;
                                }
@@ -792,7 +899,6 @@ la_bat_updates(logger *lg, logaction *la
                                                        b->tmaxpos = BUN_NONE;
                                                b->tkey = false;
                                                b->tsorted = false;
-                                               b->tkey = false;
                                                if (BUNreplace(b, q, t, true) 
!= GDK_SUCCEED) {
                                                        logbat_destroy(b);
                                                        bat_iterator_end(&vi);
@@ -1335,6 +1441,7 @@ log_read_transaction(logger *lg, BAT *id
                }
                skip_entry = (ids_to_omit && BUNfnd(ids_to_omit, &l.id) != 
BUN_NONE);
                switch (l.flag) {
+               case LOG_UPDATE_CB:
                case LOG_UPDATE_CONST:
                case LOG_UPDATE_BULK:
                case LOG_UPDATE:
@@ -1417,6 +1524,7 @@ log_read_transaction(logger *lg, BAT *id
                case LOG_SEQ:
                        err = log_read_seq(lg, &l);
                        break;
+               case LOG_UPDATE_CB:
                case LOG_UPDATE_CONST:
                case LOG_UPDATE_BULK:
                case LOG_UPDATE:
@@ -2962,16 +3070,55 @@ log_sequence(logger *lg, int seq, lng *i
        return 0;
 }
 
-gdk_return
-log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng 
cnt)
+static gdk_return
+log_constant_bulk(logger *lg, int type, const void *val, log_id id, lng 
offset, lng cnt, lng total_cnt)
 {
        bte tpe = find_type(lg, type);
        gdk_return ok = GDK_SUCCEED;
-       logformat l;
-       lng nr;
-       l.flag = LOG_UPDATE_CONST;
-       l.id = id;
-       nr = cnt;
+       lng nr = cnt;
+
+       gdk_return(*wt) (const void *, stream *, size_t) = 
BATatoms[type].atomWrite;
+
+       assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
+       if (lg->total_cnt == 0) {
+               logformat l;
+               l.flag = LOG_UPDATE_CB;
+               l.id = id;
+               if (mnstr_errnr(lg->current->output_log) != MNSTR_NO__ERROR ||
+                   log_write_format(lg, &l) != GDK_SUCCEED ||
+                   !mnstr_writeLng(lg->current->output_log, total_cnt) ||
+                   mnstr_write(lg->current->output_log, &tpe, 1, 1) != 1) {
+                       ok = GDK_FAIL;
+                       goto bailout;
+               }
+       }
+       lg->total_cnt += cnt;
+       if (lg->total_cnt == total_cnt) /* This is the last to be logged part 
of this bat, we can already reset the total_cnt */
+               lg->total_cnt = 0;
+       if (!mnstr_writeLng(lg->current->output_log, cnt) ||
+           !mnstr_writeLng(lg->current->output_log, offset)) { /* offset = -1 
indicates bat was logged in parts */
+               ok = GDK_FAIL;
+               goto bailout;
+       }
+
+       ok = wt(val, lg->current->output_log, 1);
+
+       TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
+
+  bailout:
+       if (ok != GDK_SUCCEED) {
+               ATOMIC_DEC(&lg->current->refcount);
+               const char *err = mnstr_peek_error(lg->current->output_log);
+               TRC_CRITICAL(GDK, "write failed%s%s\n", err ? ": " : "", err ? 
err : "");
+       }
+       return ok;
+}
+
+gdk_return
+log_constant(logger *lg, int type, const void *val, log_id id, lng offset, lng 
cnt, lng total_cnt)
+{
+       lng nr = cnt;
+       gdk_return ok = GDK_SUCCEED;
 
        if (LOG_DISABLED(lg) || !nr) {
                /* logging is switched off */
@@ -2983,6 +3130,14 @@ log_constant(logger *lg, int type, const
                return ok;
        }
 
+       if (cnt != total_cnt)
+               return log_constant_bulk(lg, type, val, id, offset, cnt, 
total_cnt);
+
+       bte tpe = find_type(lg, type);
+       logformat l;
+       l.flag = LOG_UPDATE_CONST;
+       l.id = id;
+
        gdk_return(*wt) (const void *, stream *, size_t) = 
BATatoms[type].atomWrite;
 
        assert(mnstr_errnr(lg->current->output_log) == MNSTR_NO__ERROR);
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -55,7 +55,7 @@ gdk_export lng log_changes(logger *lg);
 gdk_export int log_sequence(logger *lg, int seq, lng *id);
 
 /* todo pass the transaction id */
-gdk_export gdk_return log_constant(logger *lg, int type, const void *val, 
log_id id, lng offset, lng cnt);
+gdk_export gdk_return log_constant(logger *lg, int type, const void *val, 
log_id id, lng offset, lng cnt, lng total_cnt);
 gdk_export gdk_return log_bat(logger *lg, BAT *b, log_id id, lng offset, lng 
cnt, lng total_cnt); /* log slice from b */
 gdk_export gdk_return log_bat_persists(logger *lg, BAT *b, log_id id);
 gdk_export gdk_return log_bat_transient(logger *lg, log_id id);
diff --git a/monetdb5/modules/atoms/Tests/xml10.maltest 
b/monetdb5/modules/atoms/Tests/xml10.maltest
--- a/monetdb5/modules/atoms/Tests/xml10.maltest
+++ b/monetdb5/modules/atoms/Tests/xml10.maltest
@@ -65,7 +65,7 @@ statement error
 io.print(l)
 
 statement ok
-ag:= xml.subaggr(te,g,e,true)
+ag:= aggr.subxmlaggr(te,g,e,true)
 
 statement error
 io.print(ag)
@@ -93,7 +93,7 @@ io.print("book construction")
 "book construction"
 
 statement ok
-be:= xml.aggr(ae)
+be:= aggr.xmlaggr(ae)
 
 query T rowsort
 io.print(be)
diff --git a/monetdb5/modules/atoms/batxml.c b/monetdb5/modules/atoms/batxml.c
--- a/monetdb5/modules/atoms/batxml.c
+++ b/monetdb5/modules/atoms/batxml.c
@@ -1183,10 +1183,10 @@ BATXMLgroup(xml *ret, const bat *bid)
        const char *err = NULL;
 
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to