Changeset: 2f6932bb2a51 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2f6932bb2a51
Modified Files:
monetdb5/extras/crackers/crackers_multicore_unordered.mx
monetdb5/extras/crackers/crackers_parallelselect_ops.mx
Branch: holindex
Log Message:
Add multicore crackThree.
diffs (truncated from 433 to 300 lines):
diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
--- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
@@ -81,6 +81,36 @@ typedef struct {
int tid;
} thread_data_restore_t;
+typedef struct {
+ BAT *array;
+ int *temp_array;
+ oid *temp_array_oid;
+ int *data_less;
+ int *data_greater;
+ int *data_med;
+ oid left;
+ oid right;
+ int tid;
+ int low;
+ int hgh;
+} thread_data_lh_t;
+
+typedef struct {
+ BAT *array;
+ int *temp_array;
+ oid *temp_array_oid;
+ int *data_less;
+ int *data_greater;
+ int *data_med;
+ int nthreads; /*number of threads*/
+ int n; /*number of array elements*/
+ oid left;
+ oid right;
+ int tid;
+ int low;
+ int hgh;
+} thread_data_lh_new_t;
+
/* Signatures shared within the crackers module/library */
@:TypeSwitch(operations,_decl)@
#endif
@@ -114,6 +144,12 @@ void *threadFuncRestore_@2_@1(void *arg)
@
@= crackInThreeUnorderedPieces_decl
str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid
first, oid last, oid *posl, oid *posh);
+str CRKscanUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med);
+str CRKreorganizeUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med);
+str CRKrestoreUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med, oid *posl, oid *posh);
+void *threadFuncScanThree_@2_@3_@1(void *arg);
+void *threadFuncReorganizeThree_@2_@3_@1(void *arg);
+void *threadFuncRestoreThree_@2_@3_@1(void *arg);
str CRKcrackUnorderedThreeCopyParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid
first, oid last, oid *posl, oid *posh, BAT *bc);
@
@@ -525,8 +561,315 @@ CRKcrackUnorderedThreeParallel_@2_@3_@1(
return MAL_SUCCEED;
}
+str
+CRKscanUnorderedThreeParallel_@2_@3_@1(BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med)
+{
+ int i;
+ oid partition_elements;
+ thread_data_lh_t *data;
+ pthread_t **thread = (pthread_t **) GDKmalloc (sizeof(pthread_t *) *
nthreads);
+ for (i = 0; i < nthreads; i++)
+ thread[i] = (pthread_t *) GDKmalloc (sizeof(pthread_t));
+ data = GDKmalloc (nthreads * sizeof(thread_data_lh_t));
+
+ partition_elements=(last-first+1)/nthreads;
+ for (i = 0; i < nthreads; i++) {
+ data[i].array = b;
+ data[i].temp_array = temp_array;
+ data[i].temp_array_oid = temp_array_oid;
+ data[i].data_less = data_less;
+ data[i].data_greater = data_greater;
+ data[i].data_med = data_med;
+ data[i].left = first + i*partition_elements;
+ data[i].low = low;
+ data[i].hgh = hgh;
+ if(i==(nthreads-1))
+ data[i].right = last;
+ else
+ data[i].right = first +
(i*partition_elements)+partition_elements-1;
+
+ data[i].tid = i;
+
+ if (pthread_create (thread[i], NULL,
threadFuncScanThree_@2_@3_@1, &data[i]))
+ {
+ printf("Failed to create thread %d.\n",i);
+ exit (-1);
+ }
+ }
+ // Wait for threads
+
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(*thread[i], NULL)) {
+ printf ("Error joining thread %d.\n", i);
+ exit (-1);
+ }
+ }
+
+ /* Clean up. */
+ for (i = 0; i < nthreads; i++)
+ GDKfree(thread[i]);
+
+ GDKfree(thread);
+
+ return MAL_SUCCEED;
+}
+
+str
+CRKreorganizeUnorderedThreeParallel_@2_@3_@1(BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med)
+{
+ int i;
+ oid partition_elements;
+ thread_data_lh_new_t *data;
+
+ pthread_t **thread = (pthread_t **) GDKmalloc (sizeof(pthread_t *) *
nthreads);
+ for (i = 0; i < nthreads; i++)
+ thread[i] = (pthread_t *) GDKmalloc (sizeof(pthread_t));
+
+ data = GDKmalloc (nthreads * sizeof(thread_data_lh_new_t));
+
+ partition_elements=(last-first+1)/nthreads;
+ for (i = 0; i < nthreads; i++) {
+ data[i].array = b;
+ data[i].temp_array = temp_array;
+ data[i].temp_array_oid = temp_array_oid;
+ data[i].data_less = data_less;
+ data[i].data_greater = data_greater;
+ data[i].data_med = data_med;
+ data[i].n = last-first+1;
+ data[i].nthreads = nthreads;
+ data[i].left = first + i*partition_elements;
+ data[i].low = low;
+ data[i].hgh = hgh;
+ if(i==(nthreads-1))
+ data[i].right = last;
+ else
+ data[i].right = first +
(i*partition_elements)+partition_elements-1;
+
+ data[i].tid = i;
+
+ if (pthread_create (thread[i], NULL,
threadFuncReorganizeThree_@2_@3_@1, &data[i]))
+ {
+ printf("Failed to create thread %d.\n",i);
+ exit (-1);
+ }
+ }
+ // Wait for threads
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(*thread[i], NULL)) {
+ printf ("Error joining thread %d.\n", i);
+ exit (-1);
+ }
+ }
+
+ /* Clean up. */
+ for (i = 0; i < nthreads; i++)
+ GDKfree(thread[i]);
+
+ GDKfree(thread);
+
+ return MAL_SUCCEED;
+}
+
+str
+CRKrestoreUnorderedThreeParallel_@2_@3_@1(BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 low, @1 hgh, oid first, oid last, int nthreads, int
*data_less, int *data_greater, int *data_med, oid *posl, oid *posh)
+{
+ int i;
+ int positionM=0, positionH=0, positionL=0;
+ @1 *t0, *lt, *ht;
+ oid partition_elements;
+ thread_data_restore_t *data;
+
+ pthread_t **thread = (pthread_t **) GDKmalloc (sizeof(pthread_t *) *
nthreads);
+ for (i = 0; i < nthreads; i++)
+ thread[i] = (pthread_t *) GDKmalloc (sizeof(pthread_t));
+
+ data = GDKmalloc (nthreads * sizeof(thread_data_restore_t));
+
+ (void) data_greater;
+
+ partition_elements=(last-first+1)/nthreads;
+ for (i = 0; i < nthreads; i++) {
+ data[i].array = b;
+ data[i].temp_array = temp_array;
+ data[i].temp_array_oid = temp_array_oid;
+ data[i].nthreads = nthreads;
+ data[i].first = first + i*partition_elements;
+ data[i].tempfirst = i*partition_elements;
+ if(i==(nthreads-1))
+ {
+ data[i].last = last;
+ data[i].templast = partition_elements-1;
+ }
+ else
+ {
+ data[i].last = first +
(i*partition_elements)+partition_elements-1;
+ data[i].templast =
(i*partition_elements)+partition_elements-1;
+ }
+ data[i].tid = i;
+
+ if (pthread_create (thread[i], NULL,
threadFuncRestoreThree_@2_@3_@1, &data[i]))
+ {
+ printf("Failed to create thread %d.\n",i);
+ exit (-1);
+ }
+ }
+ // Wait for threads
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(*thread[i], NULL)) {
+ printf ("Error joining thread %d.\n", i);
+ exit (-1);
+ }
+ }
+
+ /* Clean up. */
+ for (i = 0; i < nthreads; i++)
+ GDKfree(thread[i]);
+
+ GDKfree(thread);
+
+ for (i = 0; i < nthreads; i++)
+ positionL=positionL+data_less[i];
+
+ for (i = 0; i < nthreads; i++)
+ positionM=positionM+data_med[i];
+
+ positionH=positionL+positionM;
+
+ t0 = (@1 *)Tloc(b, BUNfirst(b));
+ lt = (@1 *)Tloc(b, BUNfirst(b) + first + positionL-1);
+ ht = (@1 *)Tloc(b, BUNfirst(b) + first + positionH-1);
+
+
+ if(positionH==0) /*left and middle piece empty*/
+ {
+ *posl=-1;
+ *posh=-1;
+ return MAL_SUCCEED;
+ }
+ if(positionL==0) /*left piece is empty*/
+ {
+ if (@8_@7(lt, &hgh,@9@1)){
+ *posl = (oid) (lt-t0);
+ }
+ else if (@8_@5(lt, &low,@9@1))
+ *posl = (oid) (lt-t0);
+ else
+ *posl = (oid) (lt-t0) + 1;
+ *posh = (oid) (ht-t0);
+ return MAL_SUCCEED;
+ }
+ if(positionM==0) /*middle piece empty*/
+ {
+ *posl = (oid) (lt - t0) + 1;
+ *posh = (oid) (ht - t0);
+ return MAL_SUCCEED;
+ }
+
+ *posl = (oid) (lt - t0);
+ *posh = (oid) (ht - t0);
+
+ return MAL_SUCCEED;
+}
+
+
+void
+*threadFuncScanThree_@2_@3_@1(void *arg) {
+
+ thread_data_lh_t *my_data = (thread_data_lh_t *) arg;
+ @1 *ft;
+ oid *fh, *lh;
+
+ /* set bounds for the iterator */
+ ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->right);
+
+ while(fh<=lh) {
+ if (@8_@5(ft, &my_data->low,@9@1) && @8_@6(ft,
&my_data->hgh,@9@1)){
+ my_data->data_med[my_data->tid]++;
+ }
+ else if (@8_@6(ft, &my_data->hgh,@9@1)){
+ my_data->data_less[my_data->tid]++;
+ }
+ else{
+ my_data->data_greater[my_data->tid]++;
+ }
+
+ ft++; fh++;
+ }
+
+ pthread_exit(NULL);
+}
+
+void
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list