Changeset: c12d0d4f1ffd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/c12d0d4f1ffd
Modified Files:
        clients/Tests/MAL-signatures-hge.test
        gdk/gdk.h
        gdk/gdk_system.h
        monetdb5/mal/mal_pipelines.c
        monetdb5/modules/mal/pipeline.c
        sql/backends/monet5/bin_partition.c
        sql/backends/monet5/bin_partition.h
        sql/backends/monet5/bin_partition_by_slice.c
        sql/backends/monet5/bin_partition_by_value.c
        sql/backends/monet5/generator/generator.c
        sql/backends/monet5/mal_backend.h
        sql/backends/monet5/rel_bin.c
        sql/backends/monet5/rel_physical.c
        sql/backends/monet5/sql.c
        sql/backends/monet5/sql_pp_statement.c
        sql/backends/monet5/sql_pp_statement.h
        sql/common/sql_types.c
        sql/include/sql_catalog.h
        sql/storage/store.c
        sql/test/sql_dump/Tests/dump.test
Branch: pp_hashjoin
Log Message:

initial support for pipelines and generators


diffs (truncated from 998 to 300 lines):

diff --git a/clients/Tests/MAL-signatures-hge.test 
b/clients/Tests/MAL-signatures-hge.test
--- a/clients/Tests/MAL-signatures-hge.test
+++ b/clients/Tests/MAL-signatures-hge.test
@@ -45949,6 +45949,91 @@ pattern generator.join(X_0:bat[:sht], X_
 VLTgenerator_join
 (empty)
 generator
+new
+pattern generator.new(X_0:bte, X_1:bte):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:bte, X_1:bte, X_2:bte):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:date, X_1:date, X_2:int):bat[:bte]
+VLTgenerator_new
+date generator with step size in months
+generator
+new
+pattern generator.new(X_0:date, X_1:date, X_2:lng):bat[:bte]
+VLTgenerator_new
+date generator with step size in days
+generator
+new
+pattern generator.new(X_0:dbl, X_1:dbl):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:dbl, X_1:dbl, X_2:dbl):bat[:bte]
+VLTgenerator_new
+Create and materialize a generator table
+generator
+new
+pattern generator.new(X_0:flt, X_1:flt):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:flt, X_1:flt, X_2:flt):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:hge, X_1:hge):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:hge, X_1:hge, X_2:hge):bat[:bte]
+VLTgenerator_new
+Create and materialize a generator table
+generator
+new
+pattern generator.new(X_0:int, X_1:int):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:int, X_1:int, X_2:int):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:lng, X_1:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:lng, X_1:lng, X_2:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:sht, X_1:sht):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:sht, X_1:sht, X_2:sht):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:timestamp, X_1:timestamp, X_2:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
 parameters
 pattern generator.parameters(X_0:bte, X_1:bte):bat[:bte]
 VLTgenerator_noop
@@ -50333,6 +50418,11 @@ mproject
 pattern sort.mproject(X_0:bat[:int], X_1:bat[:any_1], X_2:bat[:any_1], 
X_3:ptr):bat[:any_1]
 PPmproject
 (empty)
+source
+next
+pattern source.next(X_0:bat[:any_2]):bat[:any_1]
+source_next
+return next part
 sql
 affectedRows
 unsafe pattern sql.affectedRows(X_0:int, X_1:lng):int
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -338,9 +338,13 @@ typedef struct PROPrec PROPrec;
 
 typedef void (*sink_destroy)(void *sink);
 typedef int (*sink_done)(void *sink, int wid, int nr_workers, bool redo);
+typedef int (*sink_next)(void *sink, int wid);
+typedef void *(*sink_next_bat)(void *sink, int wid);
 typedef struct Sink {
        sink_destroy destroy;
        sink_done done;
+       sink_next next; /* counter incrementing sources */
+       sink_next_bat next_bat; /* bat generating sources */
        int type;               /* sink/source type */
        char *error;
 } Sink;
diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h
--- a/gdk/gdk_system.h
+++ b/gdk/gdk_system.h
@@ -152,6 +152,7 @@ typedef int64_t lng;
 typedef struct QryCtx {
        lng starttime;
        lng endtime;
+       int wid;
        struct bstream *bs;
        ATOMIC_TYPE datasize;
        ATOMIC_BASE_TYPE maxmem;
diff --git a/monetdb5/mal/mal_pipelines.c b/monetdb5/mal/mal_pipelines.c
--- a/monetdb5/mal/mal_pipelines.c
+++ b/monetdb5/mal/mal_pipelines.c
@@ -215,6 +215,8 @@ PIPELINEworker(void *T)
                                .seqnr = -1,
                                .wls = NULL,
                        };
+                       QryCtx *qc = MT_thread_get_qry_ctx();
+                       qc->wid = p->wid;
                        stk->stk[s->mb->stmt[s->start]->argv[1]].val.ival = 
PIPELINEnext_counter(p);
                        stk->stk[s->mb->stmt[s->start]->argv[2]].val.pval = p;
                        /* the maxparts (arg 3) is generated ie constant value 
on the stack */
diff --git a/monetdb5/modules/mal/pipeline.c b/monetdb5/modules/mal/pipeline.c
--- a/monetdb5/modules/mal/pipeline.c
+++ b/monetdb5/modules/mal/pipeline.c
@@ -737,6 +737,34 @@ PPappend(Client cntxt, MalBlkPtr mb, Mal
        return MAL_SUCCEED;
 }
 
+static str
+source_next(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void)mb;
+       (void)cntxt;
+
+       bat *res = getArgReference_bat(stk, pci, 0);
+       bat sb = *getArgReference_bat(stk, pci, 1);
+       BAT *s = BATdescriptor(sb);
+
+       if (!s)
+               throw(MAL, "source.next", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
+       Sink *src = s->tsink;
+       if (!src || !src->next_bat) {
+               BBPreclaim(s);
+               throw(MAL, "source.next", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
+       }
+       QryCtx *qc = MT_thread_get_qry_ctx();
+       BAT *r = src->next_bat(src, qc->wid);
+       if (!r) {
+               BBPreclaim(s);
+               throw(SQL, "source.next",  SQLSTATE(HY013) MAL_MALLOC_FAIL);
+       }
+       *res = r->batCacheid;
+       BBPkeepref(r);
+       return MAL_SUCCEED;
+}
+
 #include "mel.h"
 static mel_func pipeline_init_funcs[] = {
  pattern("pipeline", "counter", PPcounter, true, "return counter source", 
args(1,2,
@@ -810,6 +838,11 @@ static mel_func pipeline_init_funcs[] = 
         arg("", bit),
         arg("handle", ptr), argany("mailbox",1), arg("channel",int), 
argany("value", 1)
  )),
+
+ pattern("source", "next", source_next, false, "return next part", args(1,2,
+        batargany("res", 1),
+        batargany("source", 2)
+ )),
  { .imp=NULL }
 };
 #include "mal_import.h"
diff --git a/sql/backends/monet5/bin_partition.c 
b/sql/backends/monet5/bin_partition.c
--- a/sql/backends/monet5/bin_partition.c
+++ b/sql/backends/monet5/bin_partition.c
@@ -7,71 +7,3 @@
  *
  * For copyright information, see the file debian/copyright.
  */
-
-/*
- * This file contains shared functions for bin_partition_by_value and
- * bin_partition_by_slice
- */
-
-#include "monetdb_config.h"
-
-#include "bin_partition.h"
-#include "rel_rel.h"
-#include "rel_rewriter.h"
-
-static sql_rel*
-rel_is_not_pp_safe(visitor *v, sql_rel *rel)
-{
-       if (is_groupby(rel->op) ||
-           rel->op == op_table ||
-               rel->op == op_topn ||
-               rel->oahash ||
-               ((is_join(rel->op) || is_semi(rel->op)) && rel->spb)) {
-               assert(0);
-               *((int*)v->data) = 1;
-       }
-       return rel;
-}
-
-bool
-pp_can_not_start(mvc *sql, sql_rel *rel)
-{
-       int set = 0;
-       visitor v = { .sql = sql, .data = &set };
-       rel = rel_visitor_bottomup(&v, rel, &rel_is_not_pp_safe);
-       return set;
-}
-
-/* Computes the number of slices we going to horizontally divide a table into.
- */
-#define PP_MIN_SIZE (64*1024)
-#define PP_MAX_SIZE (128*1024)
-int
-pp_nr_slices(sql_rel *rel)
-{
-       BUN est = get_rel_count(rel);
-
-       if (est == BUN_NONE || (ulng) est > (ulng) GDK_lng_max)
-               est = 85000000;
-
-       int nr_slices = 1;
-
-       if (est < PP_MIN_SIZE)
-               nr_slices = 1;
-       else if (est/GDKnr_threads < PP_MIN_SIZE)
-               nr_slices = (int)(est/PP_MIN_SIZE);
-       else
-               nr_slices = (int)(est/PP_MAX_SIZE);
-       FORCEMITODEBUG
-       if (nr_slices < GDKnr_threads)
-               nr_slices = GDKnr_threads;
-       FORCEMITODEBUG
-       if (GDKnr_threads == 1)
-               nr_slices = 8;
-
-       if (nr_slices == 0)
-               return 1;
-       assert(nr_slices > 0);
-       return nr_slices;
-}
-
diff --git a/sql/backends/monet5/bin_partition.h 
b/sql/backends/monet5/bin_partition.h
--- a/sql/backends/monet5/bin_partition.h
+++ b/sql/backends/monet5/bin_partition.h
@@ -13,8 +13,6 @@
 
 #include "mal_backend.h"
 
-extern bool pp_can_not_start(mvc *sql, sql_rel *rel);
-
-extern int pp_nr_slices(sql_rel *rel);
+#define PARTITION_NRPARTS 256
 
 #endif /*_BIN_PARTITION_H_*/
diff --git a/sql/backends/monet5/bin_partition_by_slice.c 
b/sql/backends/monet5/bin_partition_by_slice.c
--- a/sql/backends/monet5/bin_partition_by_slice.c
+++ b/sql/backends/monet5/bin_partition_by_slice.c
@@ -339,7 +339,7 @@ rel_groupby_prepare_pp(list **aggrresult
                        if (card > INT_MAX)
                                card = INT_MAX;
 
-                       stmt *s = stmt_oahash_new(be, t, nrparts?256:card, 
curhash, nrparts);
+                       stmt *s = stmt_oahash_new(be, t, nrparts?nrparts:card, 
curhash, nrparts);
                        if (s == NULL)
                                return NULL;
                        curhash = s->nr;
@@ -414,21 +414,21 @@ rel_groupby_prepare_pp(list **aggrresult
                        if (avg && EC_APPNUM(t->type->eclass) && it && 
!EC_APPNUM(it->type->eclass))
                                t = it;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to