Changeset: c12d0d4f1ffd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/c12d0d4f1ffd
Modified Files:
clients/Tests/MAL-signatures-hge.test
gdk/gdk.h
gdk/gdk_system.h
monetdb5/mal/mal_pipelines.c
monetdb5/modules/mal/pipeline.c
sql/backends/monet5/bin_partition.c
sql/backends/monet5/bin_partition.h
sql/backends/monet5/bin_partition_by_slice.c
sql/backends/monet5/bin_partition_by_value.c
sql/backends/monet5/generator/generator.c
sql/backends/monet5/mal_backend.h
sql/backends/monet5/rel_bin.c
sql/backends/monet5/rel_physical.c
sql/backends/monet5/sql.c
sql/backends/monet5/sql_pp_statement.c
sql/backends/monet5/sql_pp_statement.h
sql/common/sql_types.c
sql/include/sql_catalog.h
sql/storage/store.c
sql/test/sql_dump/Tests/dump.test
Branch: pp_hashjoin
Log Message:
initial support for pipelines and generators
diffs (truncated from 998 to 300 lines):
diff --git a/clients/Tests/MAL-signatures-hge.test
b/clients/Tests/MAL-signatures-hge.test
--- a/clients/Tests/MAL-signatures-hge.test
+++ b/clients/Tests/MAL-signatures-hge.test
@@ -45949,6 +45949,91 @@ pattern generator.join(X_0:bat[:sht], X_
VLTgenerator_join
(empty)
generator
+new
+pattern generator.new(X_0:bte, X_1:bte):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:bte, X_1:bte, X_2:bte):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:date, X_1:date, X_2:int):bat[:bte]
+VLTgenerator_new
+date generator with step size in months
+generator
+new
+pattern generator.new(X_0:date, X_1:date, X_2:lng):bat[:bte]
+VLTgenerator_new
+date generator with step size in days
+generator
+new
+pattern generator.new(X_0:dbl, X_1:dbl):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:dbl, X_1:dbl, X_2:dbl):bat[:bte]
+VLTgenerator_new
+Create and materialize a generator table
+generator
+new
+pattern generator.new(X_0:flt, X_1:flt):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:flt, X_1:flt, X_2:flt):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:hge, X_1:hge):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:hge, X_1:hge, X_2:hge):bat[:bte]
+VLTgenerator_new
+Create and materialize a generator table
+generator
+new
+pattern generator.new(X_0:int, X_1:int):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:int, X_1:int, X_2:int):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:lng, X_1:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:lng, X_1:lng, X_2:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:sht, X_1:sht):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:sht, X_1:sht, X_2:sht):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
+new
+pattern generator.new(X_0:timestamp, X_1:timestamp, X_2:lng):bat[:bte]
+VLTgenerator_new
+(empty)
+generator
parameters
pattern generator.parameters(X_0:bte, X_1:bte):bat[:bte]
VLTgenerator_noop
@@ -50333,6 +50418,11 @@ mproject
pattern sort.mproject(X_0:bat[:int], X_1:bat[:any_1], X_2:bat[:any_1],
X_3:ptr):bat[:any_1]
PPmproject
(empty)
+source
+next
+pattern source.next(X_0:bat[:any_2]):bat[:any_1]
+source_next
+return next part
sql
affectedRows
unsafe pattern sql.affectedRows(X_0:int, X_1:lng):int
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -338,9 +338,13 @@ typedef struct PROPrec PROPrec;
typedef void (*sink_destroy)(void *sink);
typedef int (*sink_done)(void *sink, int wid, int nr_workers, bool redo);
+typedef int (*sink_next)(void *sink, int wid);
+typedef void *(*sink_next_bat)(void *sink, int wid);
typedef struct Sink {
sink_destroy destroy;
sink_done done;
+ sink_next next; /* counter incrementing sources */
+ sink_next_bat next_bat; /* bat generating sources */
int type; /* sink/source type */
char *error;
} Sink;
diff --git a/gdk/gdk_system.h b/gdk/gdk_system.h
--- a/gdk/gdk_system.h
+++ b/gdk/gdk_system.h
@@ -152,6 +152,7 @@ typedef int64_t lng;
typedef struct QryCtx {
lng starttime;
lng endtime;
+ int wid;
struct bstream *bs;
ATOMIC_TYPE datasize;
ATOMIC_BASE_TYPE maxmem;
diff --git a/monetdb5/mal/mal_pipelines.c b/monetdb5/mal/mal_pipelines.c
--- a/monetdb5/mal/mal_pipelines.c
+++ b/monetdb5/mal/mal_pipelines.c
@@ -215,6 +215,8 @@ PIPELINEworker(void *T)
.seqnr = -1,
.wls = NULL,
};
+ QryCtx *qc = MT_thread_get_qry_ctx();
+ qc->wid = p->wid;
stk->stk[s->mb->stmt[s->start]->argv[1]].val.ival =
PIPELINEnext_counter(p);
stk->stk[s->mb->stmt[s->start]->argv[2]].val.pval = p;
/* the maxparts (arg 3) is generated ie constant value
on the stack */
diff --git a/monetdb5/modules/mal/pipeline.c b/monetdb5/modules/mal/pipeline.c
--- a/monetdb5/modules/mal/pipeline.c
+++ b/monetdb5/modules/mal/pipeline.c
@@ -737,6 +737,34 @@ PPappend(Client cntxt, MalBlkPtr mb, Mal
return MAL_SUCCEED;
}
+static str
+source_next(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void)mb;
+ (void)cntxt;
+
+ bat *res = getArgReference_bat(stk, pci, 0);
+ bat sb = *getArgReference_bat(stk, pci, 1);
+ BAT *s = BATdescriptor(sb);
+
+ if (!s)
+ throw(MAL, "source.next", SQLSTATE(HY002)
RUNTIME_OBJECT_MISSING);
+ Sink *src = s->tsink;
+ if (!src || !src->next_bat) {
+ BBPreclaim(s);
+ throw(MAL, "source.next", SQLSTATE(HY002)
RUNTIME_OBJECT_MISSING);
+ }
+ QryCtx *qc = MT_thread_get_qry_ctx();
+ BAT *r = src->next_bat(src, qc->wid);
+ if (!r) {
+ BBPreclaim(s);
+ throw(SQL, "source.next", SQLSTATE(HY013) MAL_MALLOC_FAIL);
+ }
+ *res = r->batCacheid;
+ BBPkeepref(r);
+ return MAL_SUCCEED;
+}
+
#include "mel.h"
static mel_func pipeline_init_funcs[] = {
pattern("pipeline", "counter", PPcounter, true, "return counter source",
args(1,2,
@@ -810,6 +838,11 @@ static mel_func pipeline_init_funcs[] =
arg("", bit),
arg("handle", ptr), argany("mailbox",1), arg("channel",int),
argany("value", 1)
)),
+
+ pattern("source", "next", source_next, false, "return next part", args(1,2,
+ batargany("res", 1),
+ batargany("source", 2)
+ )),
{ .imp=NULL }
};
#include "mal_import.h"
diff --git a/sql/backends/monet5/bin_partition.c
b/sql/backends/monet5/bin_partition.c
--- a/sql/backends/monet5/bin_partition.c
+++ b/sql/backends/monet5/bin_partition.c
@@ -7,71 +7,3 @@
*
* For copyright information, see the file debian/copyright.
*/
-
-/*
- * This file contains shared functions for bin_partition_by_value and
- * bin_partition_by_slice
- */
-
-#include "monetdb_config.h"
-
-#include "bin_partition.h"
-#include "rel_rel.h"
-#include "rel_rewriter.h"
-
-static sql_rel*
-rel_is_not_pp_safe(visitor *v, sql_rel *rel)
-{
- if (is_groupby(rel->op) ||
- rel->op == op_table ||
- rel->op == op_topn ||
- rel->oahash ||
- ((is_join(rel->op) || is_semi(rel->op)) && rel->spb)) {
- assert(0);
- *((int*)v->data) = 1;
- }
- return rel;
-}
-
-bool
-pp_can_not_start(mvc *sql, sql_rel *rel)
-{
- int set = 0;
- visitor v = { .sql = sql, .data = &set };
- rel = rel_visitor_bottomup(&v, rel, &rel_is_not_pp_safe);
- return set;
-}
-
-/* Computes the number of slices we going to horizontally divide a table into.
- */
-#define PP_MIN_SIZE (64*1024)
-#define PP_MAX_SIZE (128*1024)
-int
-pp_nr_slices(sql_rel *rel)
-{
- BUN est = get_rel_count(rel);
-
- if (est == BUN_NONE || (ulng) est > (ulng) GDK_lng_max)
- est = 85000000;
-
- int nr_slices = 1;
-
- if (est < PP_MIN_SIZE)
- nr_slices = 1;
- else if (est/GDKnr_threads < PP_MIN_SIZE)
- nr_slices = (int)(est/PP_MIN_SIZE);
- else
- nr_slices = (int)(est/PP_MAX_SIZE);
- FORCEMITODEBUG
- if (nr_slices < GDKnr_threads)
- nr_slices = GDKnr_threads;
- FORCEMITODEBUG
- if (GDKnr_threads == 1)
- nr_slices = 8;
-
- if (nr_slices == 0)
- return 1;
- assert(nr_slices > 0);
- return nr_slices;
-}
-
diff --git a/sql/backends/monet5/bin_partition.h
b/sql/backends/monet5/bin_partition.h
--- a/sql/backends/monet5/bin_partition.h
+++ b/sql/backends/monet5/bin_partition.h
@@ -13,8 +13,6 @@
#include "mal_backend.h"
-extern bool pp_can_not_start(mvc *sql, sql_rel *rel);
-
-extern int pp_nr_slices(sql_rel *rel);
+#define PARTITION_NRPARTS 256
#endif /*_BIN_PARTITION_H_*/
diff --git a/sql/backends/monet5/bin_partition_by_slice.c
b/sql/backends/monet5/bin_partition_by_slice.c
--- a/sql/backends/monet5/bin_partition_by_slice.c
+++ b/sql/backends/monet5/bin_partition_by_slice.c
@@ -339,7 +339,7 @@ rel_groupby_prepare_pp(list **aggrresult
if (card > INT_MAX)
card = INT_MAX;
- stmt *s = stmt_oahash_new(be, t, nrparts?256:card,
curhash, nrparts);
+ stmt *s = stmt_oahash_new(be, t, nrparts?nrparts:card,
curhash, nrparts);
if (s == NULL)
return NULL;
curhash = s->nr;
@@ -414,21 +414,21 @@ rel_groupby_prepare_pp(list **aggrresult
if (avg && EC_APPNUM(t->type->eclass) && it &&
!EC_APPNUM(it->type->eclass))
t = it;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]