Changeset: 94a6c138fe70 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=94a6c138fe70
Modified Files:
        clients/Tests/MAL-signatures.stable.out
        clients/Tests/MAL-signatures.stable.out.int128
        clients/Tests/exports.stable.out
        monetdb5/modules/mal/Tests/hashpartition.malC
        monetdb5/modules/mal/Tests/hashpartition.stable.out
        monetdb5/modules/mal/partition.c
        monetdb5/modules/mal/partition.h
        monetdb5/modules/mal/partition.mal
Branch: partition
Log Message:

Added single key partitioning
An alternative is to cut out the partitions by a single hash value


diffs (truncated from 325 to 300 lines):

diff --git a/clients/Tests/MAL-signatures.stable.out 
b/clients/Tests/MAL-signatures.stable.out
--- a/clients/Tests/MAL-signatures.stable.out
+++ b/clients/Tests/MAL-signatures.stable.out
@@ -8228,7 +8228,9 @@ Ready.
 [ "optimizer", "wlc",  "pattern optimizer.wlc():str ", "OPTwrapper;",  ""      
]
 [ "optimizer", "wlc",  "pattern optimizer.wlc(mod:str, fcn:str):str ", 
"OPTwrapper;",  "Inject the workload capture-replay primitives."        ]
 [ "partition", "hash", "pattern partition.hash(b:bat[:any_2]):bat[:any_2]... 
",        "PARThash;",    "Perform a value based partition "      ]
+[ "partition", "hash", "pattern partition.hash(b:bat[:any_2], key:int, 
pieces:int):bat[:any_2] ",      "PARThashkeyed;",       "Perform a value based 
partition "      ]
 [ "partition", "slice",        "pattern 
partition.slice(b:bat[:any_2]):bat[:oid]... ", "PARTslice;",   "Perform a value 
based partition to produce candidate lists "   ]
+[ "partition", "slice",        "pattern partition.slice(b:bat[:any_2], 
key:int, pieces:int):bat[:oid] ",       "PARTslicekeyed;",      "Perform a 
value based partition to produce candidate lists "   ]
 [ "pcre",      "imatch",       "command pcre.imatch(s:str, pat:str):bit ",     
"PCREimatch;",  "Caseless Perl Compatible Regular Expression pattern matching 
against a string" ]
 [ "pcre",      "index",        "command pcre.index(pat:pcre, s:str):int ",     
"PCREindex;",   "match a pattern, return matched position (or 0 when not 
found)"        ]
 [ "pcre",      "match",        "command pcre.match(s:str, pat:str):bit ",      
"PCREmatch;",   "Perl Compatible Regular Expression pattern matching against a 
string"  ]
diff --git a/clients/Tests/MAL-signatures.stable.out.int128 
b/clients/Tests/MAL-signatures.stable.out.int128
--- a/clients/Tests/MAL-signatures.stable.out.int128
+++ b/clients/Tests/MAL-signatures.stable.out.int128
@@ -10590,7 +10590,9 @@ Ready.
 [ "optimizer", "wlc",  "pattern optimizer.wlc():str ", "OPTwrapper;",  ""      
]
 [ "optimizer", "wlc",  "pattern optimizer.wlc(mod:str, fcn:str):str ", 
"OPTwrapper;",  "Inject the workload capture-replay primitives."        ]
 [ "partition", "hash", "pattern partition.hash(b:bat[:any_2]):bat[:any_2]... 
",        "PARThash;",    "Perform a value based partition "      ]
+[ "partition", "hash", "pattern partition.hash(b:bat[:any_2], key:int, 
pieces:int):bat[:any_2] ",      "PARThashkeyed;",       "Perform a value based 
partition "      ]
 [ "partition", "slice",        "pattern 
partition.slice(b:bat[:any_2]):bat[:oid]... ", "PARTslice;",   "Perform a value 
based partition to produce candidate lists "   ]
+[ "partition", "slice",        "pattern partition.slice(b:bat[:any_2], 
key:int, pieces:int):bat[:oid] ",       "PARTslicekeyed;",      "Perform a 
value based partition to produce candidate lists "   ]
 [ "pcre",      "imatch",       "command pcre.imatch(s:str, pat:str):bit ",     
"PCREimatch;",  "Caseless Perl Compatible Regular Expression pattern matching 
against a string" ]
 [ "pcre",      "index",        "command pcre.index(pat:pcre, s:str):int ",     
"PCREindex;",   "match a pattern, return matched position (or 0 when not 
found)"        ]
 [ "pcre",      "match",        "command pcre.match(s:str, pat:str):bit ",      
"PCREmatch;",   "Perl Compatible Regular Expression pattern matching against a 
string"  ]
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1597,7 +1597,9 @@ str OPTvolcanoImplementation(Client cntx
 str OPTwlcImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 str OPTwrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
+str PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr p);
 str PCREilike2(bit *ret, const str *s, const str *pat);
 str PCREilike3(bit *ret, const str *s, const str *pat, const str *esc);
 str PCREimatch(bit *ret, const str *val, const str *pat);
diff --git a/monetdb5/modules/mal/Tests/hashpartition.malC 
b/monetdb5/modules/mal/Tests/hashpartition.malC
--- a/monetdb5/modules/mal/Tests/hashpartition.malC
+++ b/monetdb5/modules/mal/Tests/hashpartition.malC
@@ -42,6 +42,8 @@ io.print(c2);
 io.print(c3);
 io.print(c4);
 io.print(c5);
+k1 := partition.hash(b,0,5);
+io.print(k1);
 
 (o1,o2):= partition.slice(b);
 io.print(o1);
@@ -53,6 +55,8 @@ io.print(o2);
 io.print(o3);
 io.print(o4);
 io.print(o5);
+s1 := partition.slice(b,0,5);
+io.print(s1);
 
 end tst;
 
diff --git a/monetdb5/modules/mal/Tests/hashpartition.stable.out 
b/monetdb5/modules/mal/Tests/hashpartition.stable.out
--- a/monetdb5/modules/mal/Tests/hashpartition.stable.out
+++ b/monetdb5/modules/mal/Tests/hashpartition.stable.out
@@ -150,6 +150,16 @@ Ready.
 [ 5@0, 29      ]
 #--------------------------#
 # h    t  # name
+# void int  # type
+#--------------------------#
+[ 0@0, 0       ]
+[ 1@0, 5       ]
+[ 2@0, 10      ]
+[ 3@0, 15      ]
+[ 4@0, 20      ]
+[ 5@0, 25      ]
+#--------------------------#
+# h    t  # name
 # void oid  # type
 #--------------------------#
 [ 0@0, 0@0     ]
@@ -236,6 +246,16 @@ Ready.
 [ 3@0, 19@0    ]
 [ 4@0, 24@0    ]
 [ 5@0, 29@0    ]
+#--------------------------#
+# h    t  # name
+# void oid  # type
+#--------------------------#
+[ 0@0, 0@0     ]
+[ 1@0, 5@0     ]
+[ 2@0, 10@0    ]
+[ 3@0, 15@0    ]
+[ 4@0, 20@0    ]
+[ 5@0, 25@0    ]
 
 # 14:09:25 >  
 # 14:09:25 >  "Done."
diff --git a/monetdb5/modules/mal/partition.c b/monetdb5/modules/mal/partition.c
--- a/monetdb5/modules/mal/partition.c
+++ b/monetdb5/modules/mal/partition.c
@@ -29,6 +29,19 @@
         }                           \
     } while (0)
 
+#define keyedhashpartition(TYPE)                                       \
+    do { p= BATcount(b);                                               \
+        bi = bat_iterator(b);                                  \
+        for (r=0; r < p; r++) {                                \
+            TYPE *v = (TYPE *) BUNtloc(bi, r);                 \
+           TYPE c =  mix_##TYPE((*v));                         \
+           if((c % pieces) == key && BUNappend(bn, (void*) v, FALSE) != 
GDK_SUCCEED){ \
+                   msg = createException(MAL,"partition.hash","Storage 
failed");\
+                   break;\
+           }\
+        }                           \
+    } while (0)
+
 #define slicepartition(TYPE)                                   \
     do { p= BATcount(b);                                               \
         bi = bat_iterator(b);                                  \
@@ -42,10 +55,24 @@
         }                           \
     } while (0)
 
+#define keyedslicepartition(TYPE)                                      \
+    do { p= BATcount(b);                                               \
+        bi = bat_iterator(b);                                  \
+        for (r=0; r < p; r++) {                                \
+            TYPE *v = (TYPE *) BUNtloc(bi, r);                 \
+           TYPE c =  mix_##TYPE((*v));                         \
+           if( (c % pieces) == key && BUNappend(bn, (void*) &r, FALSE) != 
GDK_SUCCEED){ \
+                   msg = createException(MAL,"partition.slice","Storage 
failed");\
+                   break;\
+           }\
+        }                           \
+    } while (0)
+
 str
 PARThash(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        bat *ret = getArgReference_bat(stk,pci,0);
+       int bid = *getArgReference_bat(stk,pci,pci->retc);
        int     i, pieces;
        BAT *b, *bn[MAXPART];
        BUN r, p; 
@@ -58,7 +85,7 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
        pieces = pci->retc;
        if ( pieces >= MAXPART)
                throw(MAL,"partition.hash","too many partitions");
-       b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+       b = BATdescriptor( bid);
        if ( b == NULL)
                throw(MAL, "partition.hash", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
 
@@ -111,11 +138,73 @@ PARThash(Client cntxt, MalBlkPtr mb, Mal
        BBPunfix(b->batCacheid);
        return msg;
 }
+str
+PARThashkeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       bat *ret = getArgReference_bat(stk,pci,0);
+       int bid = *getArgReference_bat(stk,pci,1);
+       int key = *getArgReference_int(stk,pci,2);
+       int pieces = *getArgReference_int(stk,pci,3);
+       BAT *b, *bn;
+       BUN r, p; 
+       BATiter bi;
+       str msg = MAL_SUCCEED;
+
+
+       (void) cntxt;
+       (void) mb;
+       if ( pieces >= MAXPART || pieces < 1)
+               throw(MAL,"partition.slice","too many partitions");
+       b = BATdescriptor( bid);
+       if ( b == NULL)
+               throw(MAL, "partition.hash", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
+
+       size_t cap = BATcount(b) / pieces * 1.1;
+       bn = COLnew(0, b->ttype, cap, TRANSIENT);
+       if (bn == NULL){
+               BBPunfix(b->batCacheid);
+               throw(MAL, "partition.hash", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+       }
+       if ( b->tvheap && bn->tvheap && ATOMstorage(b->ttype) != TYPE_str){
+               if (HEAPextend(bn->tvheap, b->tvheap->size, TRUE) != 
GDK_SUCCEED) {
+                       BBPunfix(b->batCacheid);
+                       BBPunfix(bn->batCacheid);
+                       throw(MAL, "partition.hash", SQLSTATE(HY001) 
MAL_MALLOC_FAIL);
+               }
+       }
+       /* distribute the elements over multiple partitions */
+       switch(ATOMstorage(b->ttype)){
+               case TYPE_bte:
+                       keyedhashpartition(bte);
+                       break;
+               case TYPE_sht:
+                       keyedhashpartition(sht);
+                       break;
+               case TYPE_int:
+                       keyedhashpartition(int);
+                       break;
+               case TYPE_lng:
+                       keyedhashpartition(lng);
+                       break;
+#ifdef HAVE_HGE
+               case TYPE_hge:
+                       keyedhashpartition(hge);
+                       break;
+#endif
+               default:
+                       msg = 
createException(MAL,"partition.hash","Non-supported type");
+       }
+
+       BBPkeepref(*ret = bn->batCacheid);
+       BBPunfix(b->batCacheid);
+       return msg;
+}
 
 str
 PARTslice(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        bat *ret = getArgReference_bat(stk,pci,0);
+       int bid = *getArgReference_bat(stk,pci,pci->retc);
        int     i, pieces;
        BAT *b, *bn[MAXPART];
        BUN r, p; 
@@ -128,7 +217,7 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
        pieces = pci->retc;
        if ( pieces >= MAXPART)
                throw(MAL,"partition.slice","too many partitions");
-       b = BATdescriptor( stk->stk[getArg(pci, pci->retc)].val.ival);
+       b = BATdescriptor( bid);
        if ( b == NULL)
                throw(MAL, "partition.slice", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
 
@@ -172,3 +261,57 @@ PARTslice(Client cntxt, MalBlkPtr mb, Ma
        BBPunfix(b->batCacheid);
        return msg;
 }
+
+str
+PARTslicekeyed(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       bat *ret = getArgReference_bat(stk,pci,0);
+       int bid = *getArgReference_bat(stk,pci,1);
+       int key = *getArgReference_int(stk,pci,2);
+       int pieces = *getArgReference_int(stk,pci,3);
+       BAT *b, *bn;
+       BUN r, p; 
+       BATiter bi;
+       str msg = MAL_SUCCEED;
+
+       (void) cntxt;
+       (void) mb;
+       if ( pieces >= MAXPART || pieces < 1)
+               throw(MAL,"partition.slice","too many partitions");
+       b = BATdescriptor( bid);
+       if ( b == NULL)
+               throw(MAL, "partition.slice", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
+
+       size_t cap = BATcount(b) / pieces * 1.1;
+       bn = COLnew(0, TYPE_oid, cap, TRANSIENT);
+       if (bn == NULL){
+               BBPunfix(b->batCacheid);
+               throw(MAL, "partition.slice", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+       }
+       /* distribute the elements over multiple partitions */
+       switch(ATOMstorage(b->ttype)){
+               case TYPE_bte:
+                       keyedslicepartition(bte);
+                       break;
+               case TYPE_sht:
+                       keyedslicepartition(sht);
+                       break;
+               case TYPE_int:
+                       keyedslicepartition(int);
+                       break;
+               case TYPE_lng:
+                       keyedslicepartition(lng);
+                       break;
+#ifdef HAVE_HGE
+               case TYPE_hge:
+                       keyedslicepartition(hge);
+                       break;
+#endif
+               default:
+                       msg = 
createException(MAL,"partition.slice","Non-supported type");
+       }
+
+       BBPkeepref(*ret = bn->batCacheid);
+       BBPunfix(b->batCacheid);
+       return msg;
+}
diff --git a/monetdb5/modules/mal/partition.h b/monetdb5/modules/mal/partition.h
--- a/monetdb5/modules/mal/partition.h
+++ b/monetdb5/modules/mal/partition.h
@@ -14,6 +14,8 @@
 #include "mal_interpreter.h"
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to