Changeset: 552aeee10375 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=552aeee10375
Modified Files:
monetdb5/extras/crackers/crackers_multicore_unordered.mx
monetdb5/extras/crackers/crackers_parallelselect_ops.mx
Branch: holindex
Log Message:
Implementation of multicore crackTwo algorithm.
diffs (287 lines):
diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
--- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
@@ -48,8 +48,8 @@ typedef struct {
int *temp_array;
int *data_less;
int *data_greater;
- oid* left;
- oid* right;
+ oid left;
+ oid right;
int tid;
int *pivot;
} thread_data_t;
@@ -61,8 +61,8 @@ typedef struct {
int *data_greater;
int nthreads; /*number of threads*/
int n; /*number of array elements*/
- oid* left;
- oid* right;
+ oid left;
+ oid right;
int tid;
int *pivot;
} thread_data_new_t;
@@ -95,7 +95,9 @@ crackers_export str CRKcrackUnorderedThr
@= crackInTwoUnorderedPieces_decl
str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last,
oid *pos);
str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1 *mval, oid
first, oid last, int nthreads, int *data_less, int *data_greater);
+str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, @1
*mval, oid first, oid last, int nthreads, int *data_less, int *data_greater);
void *threadFuncScan_@2_@1(void *arg);
+void *threadFuncReorganize_@2_@1(void *arg);
@
@= crackInThreeUnorderedPieces_decl
str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid
first, oid last, oid *posl, oid *posh);
@@ -222,7 +224,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
int i;
oid partition_elements;
- oid *lh;
thread_data_t *data;
pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) *
nthreads);
@@ -231,7 +232,6 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
data = malloc (nthreads * sizeof(thread_data_t));
- lh = (oid*)Hloc(b, BUNfirst(b) + last);
partition_elements=(last-first+1)/nthreads;
for (i = 0; i < nthreads; i++) {
@@ -239,12 +239,12 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
data[i].temp_array = temp_array;
data[i].data_less = data_less;
data[i].data_greater = data_greater;
- data[i].left = (oid*)Hloc(b, BUNfirst(b) +
i*partition_elements);
+ data[i].left = first + i*partition_elements;
data[i].pivot = mval;
if(i==(nthreads-1))
- data[i].right = lh;
+ data[i].right = last;
else
- data[i].right = (oid*)Hloc(b, BUNfirst(b) +
(i*partition_elements)+partition_elements-1);
+ data[i].right = first +
(i*partition_elements)+partition_elements-1;
data[i].tid = i;
@@ -270,20 +270,123 @@ CRKscanUnorderedZeroParallel_@2_@1(BAT *
return MAL_SUCCEED;
}
+str
+CRKreorganizeUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, @1 *mval, oid
first, oid last, int nthreads,int *data_less, int* data_greater) {
+
+ int i;
+ oid partition_elements;
+ thread_data_new_t *data;
+
+ pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) *
nthreads);
+
+ /* Allocate memory for the pthread_ts. */
+ for (i = 0; i < nthreads; i++)
+ thread[i] = (pthread_t *) malloc (sizeof(pthread_t));
+
+ /* Allocate memory for thread data. */
+ data = malloc (nthreads * sizeof(thread_data_new_t));
+
+ /* Now create the threads. */
+ partition_elements=((last-first)+1)/nthreads;
+ //printf("Each partition consists of %d elements
(almost).\n",partition_elements);
+ for (i = 0; i < nthreads; i++) {
+
+ data[i].array = b;
+ data[i].temp_array = temp_array;
+ data[i].data_less = data_less;
+ data[i].data_greater = data_greater;
+ data[i].n = last-first+1;
+ data[i].nthreads = nthreads;
+ data[i].left = first + i*partition_elements;
+ data[i].pivot = mval;
+ if(i==(nthreads-1))
+ data[i].right = last;
+ else
+ data[i].right = first +
(i*partition_elements)+partition_elements-1;
+
+ data[i].tid = i;
+
+ if (pthread_create (thread[i], NULL,
threadFuncReorganize_@2_@1, &data[i]))
+ {
+ printf("Failed to create thread %d.\n",i);
+ exit (-1);
+ }
+ }
+ // Wait for threads
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(*thread[i], NULL)) {
+ printf ("Error joining thread %d.\n", i);
+ exit (-1);
+ }
+ }
+
+ /* Clean up. */
+ for (i = 0; i < nthreads; i++)
+ free (thread[i]);
+
+ free (thread);
+ return MAL_SUCCEED;
+}
+
void
*threadFuncScan_@2_@1(void *arg) {
- oid *i;
- thread_data_t *my_data = (thread_data_t *) arg;
- for(i=my_data->left; i <= my_data->right; i++)
- {
- if(((@1 *)Tloc(my_data->array, BUNfirst(my_data->array))) >
my_data->pivot)
- my_data->data_greater[my_data->tid]++;
- else
- my_data->data_less[my_data->tid]++;
+
+ thread_data_t *my_data = (thread_data_t *) arg;
+ @1 *ft;
+ oid *fh, *lh;
+
+ /* set bounds for the iterator */
+ ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->right);
+
+ while(fh<lh) {
+ if (@5_@3(ft, my_data->pivot,@6@1)){
+ my_data->data_greater[my_data->tid]++;
+ }
+ else {
+ my_data->data_less[my_data->tid]++;
+ }
+ ft++; fh++;
}
+
pthread_exit(NULL);
}
+
+void *threadFuncReorganize_@2_@1(void *arg) {
+ int i;
+ int positionLess=0;
+ thread_data_new_t *my_data = (thread_data_new_t *) arg;
+ int positionGreater=my_data->n;
+ @1 *ft;
+ oid *fh, *lh;
+
+ /* set bounds for the iterator */
+ ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->left);
+ lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->right);
+
+
+ for(i=0; i < my_data->nthreads && i < my_data->tid; i++)
+ positionLess=positionLess+my_data->data_less[i];
+ for(i=0; i < my_data->nthreads && i <= my_data->tid; i++)
+ positionGreater=positionGreater - my_data->data_greater[i];
+
+ while(fh<lh) {
+ if (@5_@3(ft, my_data->pivot,@6@1)){
+ //my_data->temp_array[positionGreater]=ft;
+ positionGreater++;
+ }
+ else {
+ //my_data->temp_array[positionLess]=ft;
+ positionLess++;
+ }
+ ft++; fh++;
+ }
+
+ pthread_exit(NULL);
+}
@
@= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
--- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
+++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
@@ -132,10 +132,16 @@ CRKparalleluselectBounds_@1(int *vid, in
@= crkTwoLTree
/*CRACK in two pieces cl1-ch1 using >incLow bound*/
if (*inclusiveLow == TRUE)
+ {
CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
+ CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, low, cl1, ch1,
nthreads, p_data_less, p_data_greater);
+ }
else
+ {
CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
-
+ CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, low, cl1, ch1,
nthreads, p_data_less, p_data_greater);
+ }
+
if (vl < cl1){
/*then the left piece is empty*/
gapL = -1;
@@ -152,9 +158,16 @@ CRKparalleluselectBounds_@1(int *vid, in
@= crkTwoRTree
/*CRACK in two pieces cl2-ch2 using <incHgh bound*/
if (*inclusiveHgh == TRUE)
+ {
CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
+ CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, hgh, cl2, ch2,
nthreads, p_data_less, p_data_greater);
+ }
else
+ {
CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
+ CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, hgh, cl2, ch2,
nthreads, p_data_less, p_data_greater);
+ }
+
/*check for gaps*/
if (vh < cl2)
/*then the left piece is empty*/
@@ -203,7 +216,18 @@ createView:
BUN idxFirst;
bit copy=TRUE;
+ int *p_new = NULL;
+ int *p_data_less = NULL;
+ int *p_data_greater = NULL;
+ int nthreads=2;
+
+
int pieces=0;
+
+ p_data_less=(int *)malloc(nthreads * sizeof(int));
+ p_data_greater=(int *)malloc(nthreads * sizeof(int));
+
+
/*FILE *ofp;
char outputFilename1[] =
"/export/scratch2/petraki/experiments_1st_paper/experiments/stochastic/idle_time_2/pieces_cracking.txt";
ofp = fopen(outputFilename1,"a");
@@ -820,6 +844,14 @@ CRKRangeLeftNilTree_@1(int *vid, int *bi
oid cl2=0, ch2=0;
bit HBound,foundHgh=0;
int gapH = 1;
+
+ int *p_new = NULL;
+ int *p_data_less = NULL;
+ int *p_data_greater = NULL;
+ int nthreads=2;
+
+ p_data_less=(int *)malloc(nthreads * sizeof(int));
+ p_data_greater=(int *)malloc(nthreads * sizeof(int));
if (*inclusiveHgh == TRUE) HBound = FALSE;
@@ -921,6 +953,15 @@ CRKRangeRightNilTree_@1(int *vid, int *b
int gapL = 1;
bit LBound=FALSE;
+ int *p_new = NULL;
+ int *p_data_less = NULL;
+ int *p_data_greater = NULL;
+ int nthreads=2;
+
+ p_data_less=(int *)malloc(nthreads * sizeof(int));
+ p_data_greater=(int *)malloc(nthreads * sizeof(int));
+
+
m = existsCrackerIndex(*bid);
/* if this is the first time we parallelselect something from this bat,
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list