Changeset: ce6a0877ea60 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ce6a0877ea60
Modified Files:
        clients/Tests/MAL-signatures-hge.test
        clients/Tests/MAL-signatures.test
        monetdb5/modules/mal/pipeline.c
        monetdb5/modules/mal/pipeline.h
        monetdb5/modules/mal/pp_hash.c
        monetdb5/modules/mal/pp_slicer.c
        sql/backends/monet5/bin_partition_by_value.c
Branch: pp_hashjoin
Log Message:

Extended pipeline.counter to also take a BAT[:any] parameter, for which
PPcounter will first compute the no_slices into which the BAT should be sliced.

In this way, we can replace two MAL statements, e.g:
    X_20:int = slicer.no_slices(X_10:bat[:oid]);
        X_30:bat[:bte] := pipeline.counter(X_20:int, true:bit);                 
                                                                                
                        |
With one MAL statement:
        X_30:bat[:bte] := pipeline.counter(X_10:bat[:oid], true:bit);           
                                                                                
                              |

This is needed when we slice a BAT for a concat_block. During the MAL plan
generation for such concat_blocks, we need to move some existing MAL stmts out
of the pipeline block, but the number of stmts we can move is limited to 2 ~ 3.

NB the code in PPcounter to compute the no_slices is copied from
SLICERno_slices!


diffs (156 lines):

diff --git a/clients/Tests/MAL-signatures-hge.test 
b/clients/Tests/MAL-signatures-hge.test
--- a/clients/Tests/MAL-signatures-hge.test
+++ b/clients/Tests/MAL-signatures-hge.test
@@ -50065,6 +50065,16 @@ PPconcat_block
 Add source to concat
 pipeline
 counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1]):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1], X_1:bool):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
 unsafe pattern pipeline.counter(X_0:int):bat[:bte]
 PPcounter
 return counter source
diff --git a/clients/Tests/MAL-signatures.test 
b/clients/Tests/MAL-signatures.test
--- a/clients/Tests/MAL-signatures.test
+++ b/clients/Tests/MAL-signatures.test
@@ -38435,6 +38435,16 @@ PPconcat_block
 Add source to concat
 pipeline
 counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1]):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
+unsafe pattern pipeline.counter(X_0:bat[:any_1], X_1:bool):bat[:bte]
+PPcounter
+return counter source
+pipeline
+counter
 unsafe pattern pipeline.counter(X_0:int):bat[:bte]
 PPcounter
 return counter source
diff --git a/monetdb5/modules/mal/pipeline.c b/monetdb5/modules/mal/pipeline.c
--- a/monetdb5/modules/mal/pipeline.c
+++ b/monetdb5/modules/mal/pipeline.c
@@ -217,7 +217,32 @@ PPcounter(Client cntxt, MalBlkPtr mb, Ma
        (void)cntxt;
        (void)mb;
        bat *rb = getArgReference_bat(stk, pci, 0);
-       int nr = *getArgReference_int(stk, pci, 1);
+       int nr = 0;
+       int tpe = getArgType(mb, pci, 1);
+       if (isaBatType(tpe)) { /* get the BAT to compute nparts */
+               bat *cb = getArgReference_bat(stk, pci, 1);
+               BAT *b = BATdescriptor(*cb);
+               if (!b)
+                       return createException(SQL, "pipeline.counter", 
SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
+               size_t cnt = 0;
+               hash_table *h = (hash_table*)b->tsink;
+               if (h && h->s.type == OA_HASH_TABLE_SINK) {
+                       cnt = h->size;
+               } else {
+                       cnt = BATcount(b);
+               }
+               BBPunfix(b->batCacheid);
+
+               if (cnt < SLICE_SIZE)
+                       nr = 1;
+               else
+                       nr = (int)((cnt+SLICE_SIZE-1)/SLICE_SIZE);
+               FORCEMITODEBUG
+               if (nr < GDKnr_threads)
+                       nr = MIN((int)GDKnr_threads,(int)cnt);
+       } else { /* get nparts */
+               nr = *getArgReference_int(stk, pci, 1);
+       }
        bool sync = false;
        if (pci->argc == 3)
                sync = *getArgReference_bit(stk, pci, 2);
@@ -771,11 +796,20 @@ static mel_func pipeline_init_funcs[] = 
         batarg("sink", bte),
         arg("nr", int)
  )),
+ pattern("pipeline", "counter", PPcounter, true, "return counter source", 
args(1,2,
+        batarg("sink", bte),
+        batargany("col", 1) /* BAT to be sliced into nparts for the counter */
+ )),
  pattern("pipeline", "counter", PPcounter, true, "return counter source", 
args(1,3,
         batarg("sink", bte),
         arg("nr", int),
         arg("sync", bool)      /* sync (ie all workers need to call this 
counter once, before any can continue) */
  )),
+ pattern("pipeline", "counter", PPcounter, true, "return counter source", 
args(1,3,
+        batarg("sink", bte),
+        batargany("col", 1), /* BAT to be sliced into nparts for the counter */
+        arg("sync", bool)      /* sync (ie all workers need to call this 
counter once, before any can continue) */
+ )),
  pattern("pipeline", "counter_get", PPcounter_get, true, "return current 
number from the counter", args(1,3,
         arg("", int),
         batarg("sink", bte),
diff --git a/monetdb5/modules/mal/pipeline.h b/monetdb5/modules/mal/pipeline.h
--- a/monetdb5/modules/mal/pipeline.h
+++ b/monetdb5/modules/mal/pipeline.h
@@ -20,6 +20,8 @@
 #define pipeline_lock2(r) MT_lock_set(&r->theaplock)
 #define pipeline_unlock2(r) MT_lock_unset(&r->theaplock)
 
+#define SLICE_SIZE 100000
+
 // TODO a better way to define/add/register sinks, similar to types
 #define OA_HASH_TABLE_SINK 1
 #define OA_HASH_PAYLOAD_SINK 2
diff --git a/monetdb5/modules/mal/pp_hash.c b/monetdb5/modules/mal/pp_hash.c
--- a/monetdb5/modules/mal/pp_hash.c
+++ b/monetdb5/modules/mal/pp_hash.c
@@ -3262,8 +3262,6 @@ error:
        return err;
 }
 
-#define SLICE_SIZE 100000
-
 static str
 OAHASHno_slices(Client ctx, int *no_slices, bat *ht_sink)
 {
diff --git a/monetdb5/modules/mal/pp_slicer.c b/monetdb5/modules/mal/pp_slicer.c
--- a/monetdb5/modules/mal/pp_slicer.c
+++ b/monetdb5/modules/mal/pp_slicer.c
@@ -157,7 +157,6 @@ error:
        return msg;
 }
 
-#define SLICE_SIZE 100000
 static str
 SLICERnth_slice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
diff --git a/sql/backends/monet5/bin_partition_by_value.c 
b/sql/backends/monet5/bin_partition_by_value.c
--- a/sql/backends/monet5/bin_partition_by_value.c
+++ b/sql/backends/monet5/bin_partition_by_value.c
@@ -128,6 +128,19 @@ rel2bin_slicer_pp(backend *be, stmt *sub
                stmt *mat = sub->op4.lval->h->data;
                int nrparts = mat_nr_parts(be, mat->nr);
                source = pp_counter(be, -1, nrparts, false);
+    } else if (be->pp) {
+            if (sub && sub->cand)
+                sub = subrel_project(be, sub, NULL, NULL);
+            // FIXME: a better way to determine if 'sub' is an oahash-table. 
sub->op1 is the hash-payload!
+            node *n = sub->op1 ? sub->op4.lval->t : sub->op4.lval->h; /* for 
hash get last */
+            stmt *sc = n->data;
+
+            if (sc->nrcols == 0) {
+                source = pp_counter(be, 1, -1, false);
+            } else {
+                //sc = stmt_no_slices(be, sc, sub->op1?true:false /* hash 
table on op1 */);
+                source = pp_counter(be, -1, sc->nr, false);
+            }
        } else {
                source = pp_counter(be, -1, pp_dynamic_slices(be, sub), false);
        }
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to