Changeset: 54050545e932 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=54050545e932
Modified Files:
        monetdb5/extras/crackers/crackers_core_unordered.mx
        monetdb5/extras/crackers/crackers_selectholpl_ops.mx
Branch: holindex
Log Message:

Implement a parallel copy for the cracker bat.


diffs (148 lines):

diff --git a/monetdb5/extras/crackers/crackers_core_unordered.mx 
b/monetdb5/extras/crackers/crackers_core_unordered.mx
--- a/monetdb5/extras/crackers/crackers_core_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_core_unordered.mx
@@ -112,6 +112,7 @@ crackers_export str CRKparallelscan_@1 (
 @= crackInTwoUnorderedPieces_decl
 str CRKcrackUnorderedZero_@2_@1( BAT *b, @1 mval, oid first, oid last, oid 
*pos, int nthreads, int vector_elements);
 str CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, oid first, oid last, int 
nthreads);
+str CRKparallelcopy_@2_@1( BAT *b, BAT *ob, oid first, oid last, int nthreads);
 @
 @= crackInThreeUnorderedPieces_decl
 str CRKcrackUnorderedThree_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid 
last, oid *posl, oid *posh);
@@ -1762,6 +1763,116 @@ CRKparallelscan_@2_@1( BAT *b, BAT *ob, 
 
        return msg;
 }
+static str CRKcopy_x_@2_@1 (
+       /* input */
+       const BAT* crackerbat,      /* attribute (array) */
+       BAT* basebat, /*output (array)*/
+       BUN first,  /* first position of to-be-cracked piece */
+       BUN last  /* last position of to-be-cracked piece */
+) {
+       @1  *src_t_crackerbat = (@1 *) Tloc(crackerbat, BUNfirst(crackerbat));
+       @1  *src_t_basebat = (@1 *) Tloc(basebat, BUNfirst(basebat));
+       oid  *src_h_crackerbat = (oid *) Hloc(crackerbat, BUNfirst(crackerbat));
+       oid  *src_h_basebat = (oid *) Hloc(basebat, BUNfirst(basebat));
+       const BUN n = last - first + 1;
+       const BUN snt = n * sizeof(@1);
+       const BUN snh = n * sizeof(oid);
+
+       memcpy(&src_h_crackerbat[first], &src_h_basebat[first], snt);
+       memcpy(&src_t_crackerbat[first], &src_t_basebat[first], snh);
+
+       return MAL_SUCCEED;
+}
+/* crackThread for new multi-threaded crack code */
+static void*
+copy_MT_Thread_@2_@1 ( void *arg_p )
+{
+       s_Thread_t *arg = (s_Thread_t*) arg_p;
+
+       /* call actual cracking routine for this slice */
+       arg->msg = CRKcopy_x_@2_@1 ( arg->b, arg->ob, arg->first, arg->last);
+
+       pthread_exit(NULL);
+       return NULL;
+}
+static str CRKcopy_MT_@2_@1 (const BAT *b, BAT *ob, BUN first, BUN last, int 
nthreads)
+{
+        BUN n = last - first + 1; /* total # tuples / values */
+        BUN mm;                   /* # tuples / values in tmp arrays */
+        BUN f, l;                 /* first / last BUN per slice */
+       pthread_t *s_Thread;      /* threads array */
+        s_Thread_t *s_Thread_arg; /* thread arguments array */
+        int i;
+
+       /* adjust nthreads */
+       if (nthreads == 0) {
+               /* automatic setting */
+               nthreads = GDKnr_threads;
+       }
+        if ((BUN) nthreads > n / 10) {
+                /* more threads / smaller slices does not make sense */
+                nthreads = (int) (n / 1000) + 1;
+        }
+
+       mm = (n / nthreads);
+
+       s_Thread = GDKmalloc (nthreads * sizeof(pthread_t));
+        s_Thread_arg  = GDKmalloc(nthreads * sizeof(s_Thread_t));
+       if (!s_Thread || !s_Thread_arg) {
+               if (s_Thread)
+                       GDKfree(s_Thread);
+               if (s_Thread_arg)
+                       GDKfree(s_Thread_arg);
+               throw (MAL, "crackers.parallelcopy", "CRKcopy_MT_@2_@1(): 
GDKmalloc() failed.");
+       }
+
+        /* initialize crackThread arguments */
+        /* Alternative 1: each thread cracks one consecutive slice */
+        for (i = 0, f = first, l = f + mm - 1; i < nthreads; i++, f += mm, l 
+= mm) {
+                s_Thread_arg[i].b       = b;
+                s_Thread_arg[i].first   = f;
+                s_Thread_arg[i].last    = (i < nthreads - 1) ? l : last;
+                s_Thread_arg[i].ob      = ob;
+        }
+       
+       /* spawn crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (pthread_create(&s_Thread[i], NULL, copy_MT_Thread_@2_@1, 
&s_Thread_arg[i])) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1(): 
Failed spaning crackThread %d.", i);
+               }
+       }
+       /* join crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (pthread_join(s_Thread[i], NULL)) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1(): 
Failed joining crackThread %d.", i);
+                }
+       }
+       /* check for & report failing crackThreads */
+       for (i = 0; i < nthreads; i++) {
+               if (s_Thread_arg[i].msg != MAL_SUCCEED) {
+                       GDKfree(s_Thread);
+                       GDKfree(s_Thread_arg);
+                       throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1(): 
crackThread %d failed with '%s'.", i, s_Thread_arg[i].msg);
+                }
+       }
+       return MAL_SUCCEED;
+}
+
+/* parallel copy dispatcher*/
+str
+CRKparallelcopy_@2_@1( BAT *b, BAT *ob, BUN first, BUN last, int nthreads){
+       str msg = MAL_SUCCEED;
+       assert(b && ob);
+       assert(last >= first);
+
+       msg = CRKcopy_MT_@2_@1(b, ob, first, last, nthreads);
+
+       return msg;
+}
 
 @
 @= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx 
b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
--- a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
+++ b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
@@ -1412,9 +1412,13 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
                }
 
                gettimeofday(&tv0_copy, 0);
-               b = BATcopy(bo, bo->htype, bo->ttype, TRUE);
-               if ( bo->htype == TYPE_void)
-                       b = BATmaterializeh(b);
+               //b = BATcopy(bo, bo->htype, bo->ttype, TRUE);
+               //if ( bo->htype == TYPE_void)
+               //      b = BATmaterializeh(b);
+               b=BATnew(TYPE_oid, bo->ttype,BATcount(bo));
+               BATkey(BATmirror(b),FALSE);
+                BATsetcount(b,BATcount(bo));
+               CRKparallelcopy_LE_@1(b, bo, (BUN) 0, BATcount(bo)-1, nthreads);
                gettimeofday(&tv1_copy, 0);
                fprintf(ofp_copy,"%d\n",(int)dt(tv0_copy,tv1_copy)*1000000);
 
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to