Changeset: ce6a0877ea60 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ce6a0877ea60
Modified Files:
clients/Tests/MAL-signatures-hge.test
clients/Tests/MAL-signatures.test
monetdb5/modules/mal/pipeline.c
monetdb5/modules/mal/pipeline.h
monetdb5/modules/mal/pp_hash.c
monetdb5/modules/mal/pp_slicer.c
sql/backends/monet5/bin_partition_by_value.c
Branch: pp_hashjoin
Log Message:
Extended pipeline.counter to also take a BAT[:any] parameter, for which
PPcounter will first compute the no_slices into which the BAT should be sliced.
In this way, we can replace two MAL statements, e.g:
X_20:int = slicer.no_slices(X_10:bat[:oid]);
X_30:bat[:bte] := pipeline.counter(X_20:int, true:bit);
|
With one MAL statement:
X_30:bat[:bte] := pipeline.counter(X_10:bat[:oid], true:bit);
|
This is needed when we slice a BAT for a concat_block. During the MAL plan
generation for such concat_blocks, we need to move some existing MAL stmts out
of the pipeline block, but the number of stmts we can move is limited to 2 ~ 3.
NB the code in PPcounter to compute the no_slices is copied from
SLICERno_slices!
diffs (156 lines):
diff --git a/clients/Tests/MAL-signatures-hge.test
b/clients/Tests/MAL-signatures-hge.test
--- a/clients/Tests/MAL-signatures-hge.test
+++ b/clients/Tests/MAL-signatures-hge.test
@@ -50065,6 +50065,16 @@ PPconcat_block
Add source to concat
pipeline
counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1]):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1], X_1:bool):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
unsafe pattern pipeline.counter(X_0:int):bat[:bte]
PPcounter
return counter source
diff --git a/clients/Tests/MAL-signatures.test
b/clients/Tests/MAL-signatures.test
--- a/clients/Tests/MAL-signatures.test
+++ b/clients/Tests/MAL-signatures.test
@@ -38435,6 +38435,16 @@ PPconcat_block
Add source to concat
pipeline
counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1]):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1], X_1:bool):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
unsafe pattern pipeline.counter(X_0:int):bat[:bte]
PPcounter
return counter source
diff --git a/monetdb5/modules/mal/pipeline.c b/monetdb5/modules/mal/pipeline.c
--- a/monetdb5/modules/mal/pipeline.c
+++ b/monetdb5/modules/mal/pipeline.c
@@ -217,7 +217,32 @@ PPcounter(Client cntxt, MalBlkPtr mb, Ma
(void)cntxt;
(void)mb;
bat *rb = getArgReference_bat(stk, pci, 0);
- int nr = *getArgReference_int(stk, pci, 1);
+ int nr = 0;
+ int tpe = getArgType(mb, pci, 1);
+ if (isaBatType(tpe)) { /* get the BAT to compute nparts */
+ bat *cb = getArgReference_bat(stk, pci, 1);
+ BAT *b = BATdescriptor(*cb);
+ if (!b)
+ return createException(SQL, "pipeline.counter",
SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+ size_t cnt = 0;
+ hash_table *h = (hash_table*)b->tsink;
+ if (h && h->s.type == OA_HASH_TABLE_SINK) {
+ cnt = h->size;
+ } else {
+ cnt = BATcount(b);
+ }
+ BBPunfix(b->batCacheid);
+
+ if (cnt < SLICE_SIZE)
+ nr = 1;
+ else
+ nr = (int)((cnt+SLICE_SIZE-1)/SLICE_SIZE);
+ FORCEMITODEBUG
+ if (nr < GDKnr_threads)
+ nr = MIN((int)GDKnr_threads,(int)cnt);
+ } else { /* get nparts */
+ nr = *getArgReference_int(stk, pci, 1);
+ }
bool sync = false;
if (pci->argc == 3)
sync = *getArgReference_bit(stk, pci, 2);
@@ -771,11 +796,20 @@ static mel_func pipeline_init_funcs[] =
batarg("sink", bte),
arg("nr", int)
)),
+ pattern("pipeline", "counter", PPcounter, true, "return counter source",
args(1,2,
+ batarg("sink", bte),
+ batargany("col", 1) /* BAT to be sliced into nparts for the counter */
+ )),
pattern("pipeline", "counter", PPcounter, true, "return counter source",
args(1,3,
batarg("sink", bte),
arg("nr", int),
arg("sync", bool) /* sync (ie all workers need to call this
counter once, before any can continue) */
)),
+ pattern("pipeline", "counter", PPcounter, true, "return counter source",
args(1,3,
+ batarg("sink", bte),
+ batargany("col", 1), /* BAT to be sliced into nparts for the counter */
+ arg("sync", bool) /* sync (ie all workers need to call this
counter once, before any can continue) */
+ )),
pattern("pipeline", "counter_get", PPcounter_get, true, "return current
number from the counter", args(1,3,
arg("", int),
batarg("sink", bte),
diff --git a/monetdb5/modules/mal/pipeline.h b/monetdb5/modules/mal/pipeline.h
--- a/monetdb5/modules/mal/pipeline.h
+++ b/monetdb5/modules/mal/pipeline.h
@@ -20,6 +20,8 @@
#define pipeline_lock2(r) MT_lock_set(&r->theaplock)
#define pipeline_unlock2(r) MT_lock_unset(&r->theaplock)
+#define SLICE_SIZE 100000
+
// TODO a better way to define/add/register sinks, similar to types
#define OA_HASH_TABLE_SINK 1
#define OA_HASH_PAYLOAD_SINK 2
diff --git a/monetdb5/modules/mal/pp_hash.c b/monetdb5/modules/mal/pp_hash.c
--- a/monetdb5/modules/mal/pp_hash.c
+++ b/monetdb5/modules/mal/pp_hash.c
@@ -3262,8 +3262,6 @@ error:
return err;
}
-#define SLICE_SIZE 100000
-
static str
OAHASHno_slices(Client ctx, int *no_slices, bat *ht_sink)
{
diff --git a/monetdb5/modules/mal/pp_slicer.c b/monetdb5/modules/mal/pp_slicer.c
--- a/monetdb5/modules/mal/pp_slicer.c
+++ b/monetdb5/modules/mal/pp_slicer.c
@@ -157,7 +157,6 @@ error:
return msg;
}
-#define SLICE_SIZE 100000
static str
SLICERnth_slice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
diff --git a/sql/backends/monet5/bin_partition_by_value.c
b/sql/backends/monet5/bin_partition_by_value.c
--- a/sql/backends/monet5/bin_partition_by_value.c
+++ b/sql/backends/monet5/bin_partition_by_value.c
@@ -128,6 +128,19 @@ rel2bin_slicer_pp(backend *be, stmt *sub
stmt *mat = sub->op4.lval->h->data;
int nrparts = mat_nr_parts(be, mat->nr);
source = pp_counter(be, -1, nrparts, false);
+ } else if (be->pp) {
+ if (sub && sub->cand)
+ sub = subrel_project(be, sub, NULL, NULL);
+ // FIXME: a better way to determine if 'sub' is an oahash-table.
sub->op1 is the hash-payload!
+ node *n = sub->op1 ? sub->op4.lval->t : sub->op4.lval->h; /* for
hash get last */
+ stmt *sc = n->data;
+
+ if (sc->nrcols == 0) {
+ source = pp_counter(be, 1, -1, false);
+ } else {
+ //sc = stmt_no_slices(be, sc, sub->op1?true:false /* hash
table on op1 */);
+ source = pp_counter(be, -1, sc->nr, false);
+ }
} else {
source = pp_counter(be, -1, pp_dynamic_slices(be, sub), false);
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]