Changeset: 54050545e932 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=54050545e932
Modified Files:
monetdb5/extras/crackers/crackers_core_unordered.mx
monetdb5/extras/crackers/crackers_selectholpl_ops.mx
Branch: holindex
Log Message:
Implement a parallel copy for the cracker bat.
diffs (148 lines):
diff --git a/monetdb5/extras/crackers/crackers_core_unordered.mx
b/monetdb5/extras/crackers/crackers_core_unordered.mx
--- a/monetdb5/extras/crackers/crackers_core_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_core_unordered.mx
@@ -112,6 +112,7 @@ crackers_export str CRKparallelscan_@1 (
@= crackInTwoUnorderedPieces_decl
str CRKcrackUnorderedZero_@2_@1( BAT *b, @1 mval, oid first, oid last, oid
*pos, int nthreads, int vector_elements);
str CRKparallelscan_@2_@1( BAT *b, BAT *ob, @1 mval, oid first, oid last, int
nthreads);
+str CRKparallelcopy_@2_@1( BAT *b, BAT *ob, oid first, oid last, int nthreads);
@
@= crackInThreeUnorderedPieces_decl
str CRKcrackUnorderedThree_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid first, oid
last, oid *posl, oid *posh);
@@ -1762,6 +1763,116 @@ CRKparallelscan_@2_@1( BAT *b, BAT *ob,
return msg;
}
+static str CRKcopy_x_@2_@1 (
+ /* input */
+ const BAT* crackerbat, /* attribute (array) */
+ BAT* basebat, /*output (array)*/
+ BUN first, /* first position of to-be-cracked piece */
+ BUN last /* last position of to-be-cracked piece */
+) {
+ @1 *src_t_crackerbat = (@1 *) Tloc(crackerbat, BUNfirst(crackerbat));
+ @1 *src_t_basebat = (@1 *) Tloc(basebat, BUNfirst(basebat));
+ oid *src_h_crackerbat = (oid *) Hloc(crackerbat, BUNfirst(crackerbat));
+ oid *src_h_basebat = (oid *) Hloc(basebat, BUNfirst(basebat));
+ const BUN n = last - first + 1;
+ const BUN snt = n * sizeof(@1);
+ const BUN snh = n * sizeof(oid);
+
+ memcpy(&src_h_crackerbat[first], &src_h_basebat[first], snt);
+ memcpy(&src_t_crackerbat[first], &src_t_basebat[first], snh);
+
+ return MAL_SUCCEED;
+}
+/* crackThread for new multi-threaded crack code */
+static void*
+copy_MT_Thread_@2_@1 ( void *arg_p )
+{
+ s_Thread_t *arg = (s_Thread_t*) arg_p;
+
+ /* call actual cracking routine for this slice */
+ arg->msg = CRKcopy_x_@2_@1 ( arg->b, arg->ob, arg->first, arg->last);
+
+ pthread_exit(NULL);
+ return NULL;
+}
+static str CRKcopy_MT_@2_@1 (const BAT *b, BAT *ob, BUN first, BUN last, int
nthreads)
+{
+ BUN n = last - first + 1; /* total # tuples / values */
+ BUN mm; /* # tuples / values in tmp arrays */
+ BUN f, l; /* first / last BUN per slice */
+ pthread_t *s_Thread; /* threads array */
+ s_Thread_t *s_Thread_arg; /* thread arguments array */
+ int i;
+
+ /* adjust nthreads */
+ if (nthreads == 0) {
+ /* automatic setting */
+ nthreads = GDKnr_threads;
+ }
+ if ((BUN) nthreads > n / 10) {
+ /* more threads / smaller slices does not make sense */
+ nthreads = (int) (n / 1000) + 1;
+ }
+
+ mm = (n / nthreads);
+
+ s_Thread = GDKmalloc (nthreads * sizeof(pthread_t));
+ s_Thread_arg = GDKmalloc(nthreads * sizeof(s_Thread_t));
+ if (!s_Thread || !s_Thread_arg) {
+ if (s_Thread)
+ GDKfree(s_Thread);
+ if (s_Thread_arg)
+ GDKfree(s_Thread_arg);
+ throw (MAL, "crackers.parallelcopy", "CRKcopy_MT_@2_@1():
GDKmalloc() failed.");
+ }
+
+ /* initialize crackThread arguments */
+ /* Alternative 1: each thread cracks one consecutive slice */
+ for (i = 0, f = first, l = f + mm - 1; i < nthreads; i++, f += mm, l
+= mm) {
+ s_Thread_arg[i].b = b;
+ s_Thread_arg[i].first = f;
+ s_Thread_arg[i].last = (i < nthreads - 1) ? l : last;
+ s_Thread_arg[i].ob = ob;
+ }
+
+ /* spawn crackThreads */
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_create(&s_Thread[i], NULL, copy_MT_Thread_@2_@1,
&s_Thread_arg[i])) {
+ GDKfree(s_Thread);
+ GDKfree(s_Thread_arg);
+ throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1():
Failed spaning crackThread %d.", i);
+ }
+ }
+ /* join crackThreads */
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(s_Thread[i], NULL)) {
+ GDKfree(s_Thread);
+ GDKfree(s_Thread_arg);
+ throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1():
Failed joining crackThread %d.", i);
+ }
+ }
+ /* check for & report failing crackThreads */
+ for (i = 0; i < nthreads; i++) {
+ if (s_Thread_arg[i].msg != MAL_SUCCEED) {
+ GDKfree(s_Thread);
+ GDKfree(s_Thread_arg);
+ throw (MAL, "crackers.select", "CRKcopy_MT_@2_@1():
crackThread %d failed with '%s'.", i, s_Thread_arg[i].msg);
+ }
+ }
+ return MAL_SUCCEED;
+}
+
+/* parallel copy dispatcher*/
+str
+CRKparallelcopy_@2_@1( BAT *b, BAT *ob, BUN first, BUN last, int nthreads){
+ str msg = MAL_SUCCEED;
+ assert(b && ob);
+ assert(last >= first);
+
+ msg = CRKcopy_MT_@2_@1(b, ob, first, last, nthreads);
+
+ return msg;
+}
@
@= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
--- a/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
+++ b/monetdb5/extras/crackers/crackers_selectholpl_ops.mx
@@ -1412,9 +1412,13 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
}
gettimeofday(&tv0_copy, 0);
- b = BATcopy(bo, bo->htype, bo->ttype, TRUE);
- if ( bo->htype == TYPE_void)
- b = BATmaterializeh(b);
+ //b = BATcopy(bo, bo->htype, bo->ttype, TRUE);
+ //if ( bo->htype == TYPE_void)
+ // b = BATmaterializeh(b);
+ b=BATnew(TYPE_oid, bo->ttype,BATcount(bo));
+ BATkey(BATmirror(b),FALSE);
+ BATsetcount(b,BATcount(bo));
+ CRKparallelcopy_LE_@1(b, bo, (BUN) 0, BATcount(bo)-1, nthreads);
gettimeofday(&tv1_copy, 0);
fprintf(ofp_copy,"%d\n",(int)dt(tv0_copy,tv1_copy)*1000000);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list