Changeset: d5294af6e7cb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d5294af6e7cb
Modified Files:
monetdb5/extras/crackers/crackers_multicore_unordered.mx
monetdb5/extras/crackers/crackers_parallelselect_ops.mx
Branch: holindex
Log Message:
Add restore fuction for multicore cracking.
diffs (truncated from 353 to 300 lines):
diff --git a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
--- a/monetdb5/extras/crackers/crackers_multicore_unordered.mx
+++ b/monetdb5/extras/crackers/crackers_multicore_unordered.mx
@@ -69,6 +69,17 @@ typedef struct {
int pivot;
} thread_data_new_t;
+typedef struct {
+ BAT *array;
+ int *temp_array;
+ oid *temp_array_oid;
+ int nthreads; /*number of threads*/
+ oid first;
+ oid last;
+ int tempfirst;
+ int templast;
+ int tid;
+} thread_data_restore_t;
/* Signatures shared within the crackers module/library */
@:TypeSwitch(operations,_decl)@
@@ -97,9 +108,11 @@ crackers_export str CRKcrackUnorderedThr
@= crackInTwoUnorderedPieces_decl
str CRKcrackUnorderedZeroParallel_@2_@1( BAT *b, @1 mval, oid first, oid last,
oid *pos);
str CRKscanUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less,
int *data_greater);
-str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid
*temp_arra_oid, @1 mval, oid first, oid last, int nthreads, int *data_less, int
*data_greater);
+str CRKreorganizeUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less,
int *data_greater);
+str CRKrestoreUnorderedZeroParallel_@2_@1( BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 mval, oid first, oid last, int nthreads, int *data_less,
int *data_greater, oid *pos);
void *threadFuncScan_@2_@1(void *arg);
void *threadFuncReorganize_@2_@1(void *arg);
+void *threadFuncRestore_@2_@1(void *arg);
@
@= crackInThreeUnorderedPieces_decl
str CRKcrackUnorderedThreeParallel_@2_@3_@1( BAT *b, @1 low, @1 hgh, oid
first, oid last, oid *posl, oid *posh);
@@ -332,6 +345,90 @@ CRKreorganizeUnorderedZeroParallel_@2_@1
return MAL_SUCCEED;
}
+str
+CRKrestoreUnorderedZeroParallel_@2_@1(BAT *b, @1 *temp_array, oid
*temp_array_oid, @1 mval, oid first, oid last, int nthreads,int *data_less, int
*data_greater,oid *pos) {
+
+ int i;
+ @1 *lt, *t0;
+ oid partition_elements, position=0;
+ thread_data_restore_t *data;
+
+ pthread_t **thread = (pthread_t **) malloc (sizeof(pthread_t *) *
nthreads);
+
+ (void) data_greater;
+
+ /* Allocate memory for the pthread_ts. */
+ for (i = 0; i < nthreads; i++)
+ thread[i] = (pthread_t *) malloc (sizeof(pthread_t));
+
+ /* Allocate memory for thread data. */
+ data = malloc (nthreads * sizeof(thread_data_restore_t));
+
+ /* Now create the threads. */
+ partition_elements=(last-first+1)/nthreads;
+ //printf("Each partition consists of %d elements
(almost).\n",partition_elements);
+ for (i = 0; i < nthreads; i++) {
+
+ data[i].array = b;
+ data[i].temp_array = temp_array;
+ data[i].temp_array_oid = temp_array_oid;
+ data[i].nthreads = nthreads;
+ data[i].first = first + i*partition_elements;
+ data[i].tempfirst = i*partition_elements;
+ if(i==(nthreads-1))
+ {
+ data[i].last = last;
+ data[i].templast = partition_elements-1;
+ }
+ else
+ {
+ data[i].last = first +
(i*partition_elements)+partition_elements-1;
+ data[i].templast =
(i*partition_elements)+partition_elements-1;
+ }
+ data[i].tid = i;
+ if (pthread_create (thread[i], NULL, threadFuncRestore_@2_@1,
&data[i]))
+ {
+ printf("Failed to create thread %d.\n",i);
+ exit (-1);
+ }
+ }
+ // Wait for threads
+ for (i = 0; i < nthreads; i++) {
+ if (pthread_join(*thread[i], NULL)) {
+ printf ("Error joining thread %d.\n", i);
+ exit (-1);
+ }
+ }
+
+ /* Clean up. */
+ for (i = 0; i < nthreads; i++)
+ free (thread[i]);
+
+ free (thread);
+
+ for (i = 0; i < nthreads; i++)
+ position=position+data_less[i];
+
+ t0 = (@1 *)Tloc(b, BUNfirst(b));
+ lt = (@1 *)Tloc(b, BUNfirst(b) + first + position-1);
+// *pos=(oid) (lt-t0);
+
+ if (@5_@4(lt, &mval,@6@1)){
+ if (lt==t0)
+ *pos = (oid) BUNfirst(b);
+ else
+ *pos = (oid) (lt - t0) - 1; /*works for empty left
piece also*/
+ }
+ else{
+ *pos = (oid) (lt - t0);
+ if (*pos==last) /*empty right piece*/
+ *pos = *pos + 1;
+ }
+
+
+ return MAL_SUCCEED;
+}
+
void
*threadFuncScan_@2_@1(void *arg) {
@@ -393,6 +490,27 @@ void *threadFuncReorganize_@2_@1(void *a
pthread_exit(NULL);
}
+
+void *threadFuncRestore_@2_@1(void *arg) {
+ thread_data_restore_t *my_data = (thread_data_restore_t *) arg;
+ @1 *ft;
+ oid *fh, *lh;
+
+ /* set bounds for the iterator */
+ ft = (@1 *)Tloc(my_data->array, BUNfirst(my_data->array) +
my_data->first);
+ fh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->first);
+ lh = (oid*)Hloc(my_data->array, BUNfirst(my_data->array) +
my_data->last);
+
+ while(fh<=lh) {
+ *ft=my_data->temp_array[my_data->tempfirst];
+ *fh=my_data->temp_array_oid[my_data->tempfirst];
+ my_data->tempfirst++;
+ ft++; fh++;
+ }
+
+ pthread_exit(NULL);
+}
+
@
@= crackInThreeUnorderedPieces_impl
diff --git a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
--- a/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
+++ b/monetdb5/extras/crackers/crackers_parallelselect_ops.mx
@@ -133,19 +133,21 @@ CRKparalleluselectBounds_@1(int *vid, in
/*CRACK in two pieces cl1-ch1 using >incLow bound*/
if (*inclusiveLow == TRUE)
{
- CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
- CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *low,
cl1, ch1, nthreads, p_data_less, p_data_greater);
- p_new=(int *)malloc((ch1-cl1) * sizeof(int));
- p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
- CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*low, cl1, ch1,&vl);
+ CRKscanUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ pl_new=(int *)malloc((ch1-cl1) * sizeof(int));
+ pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
+ CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pl_new,
pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pl_new, pl_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl);
}
else
{
- CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
- CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *low,
cl1, ch1, nthreads, p_data_less, p_data_greater);
- p_new=(int *)malloc((ch1-cl1) * sizeof(int));
- p_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
- CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*low, cl1, ch1,&vl);
+ CRKscanUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ pl_new=(int *)malloc((ch1-cl1) * sizeof(int));
+ pl_new_oid=(oid *)malloc((ch1-cl1) * sizeof(oid));
+ CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pl_new,
pl_new_oid, *low, cl1, ch1, nthreads, p_data_less, p_data_greater);
+ CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pl_new, pl_new_oid,
*low, cl1, ch1, nthreads, p_data_less, p_data_greater, &vl);
}
if (vl < cl1){
@@ -165,19 +167,21 @@ CRKparalleluselectBounds_@1(int *vid, in
/*CRACK in two pieces cl2-ch2 using <incHgh bound*/
if (*inclusiveHgh == TRUE)
{
- CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
- CRKscanUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid, *hgh,
cl2, ch2, nthreads, p_data_less, p_data_greater);
- p_new=(int *)malloc((ch2-cl2) * sizeof(int));
- p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
- CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, p_new, p_new_oid,
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
- }
+ //CRKcrackUnorderedZeroParallel@2_LE_@1(b,*hgh, cl2, ch2,&vh);
+ CRKscanUnorderedZeroParallel@2_LE_@1(b, pr_new, pr_new_oid,
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+ pr_new=(int *)malloc((ch2-cl2) * sizeof(int));
+ pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
+ CRKreorganizeUnorderedZeroParallel@2_LE_@1(b, pr_new,
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+ CRKrestoreUnorderedZeroParallel@2_LE_@1(b, pr_new,
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh);
+ }
else
{
- CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
- CRKscanUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid, *hgh,
cl2, ch2, nthreads, p_data_less, p_data_greater);
- p_new=(int *)malloc((ch2-cl2) * sizeof(int));
- p_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
- CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, p_new, p_new_oid,
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+ //CRKcrackUnorderedZeroParallel@2_RE_@1(b,*hgh, cl2, ch2,&vh);
+ CRKscanUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid,
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+ pr_new=(int *)malloc((ch2-cl2) * sizeof(int));
+ pr_new_oid=(oid *)malloc((ch2-cl2) * sizeof(oid));
+ CRKreorganizeUnorderedZeroParallel@2_RE_@1(b, pr_new,
pr_new_oid, *hgh, cl2, ch2, nthreads, p_data_less, p_data_greater);
+ CRKrestoreUnorderedZeroParallel@2_RE_@1(b, pr_new, pr_new_oid,
*hgh, cl2, ch2, nthreads, p_data_less, p_data_greater, &vh);
}
/*check for gaps*/
@@ -189,7 +193,6 @@ CRKparalleluselectBounds_@1(int *vid, in
gapH = -1;
/*vh--;*/
}
-
@
@= CreateResult
createView:
@@ -227,9 +230,12 @@ createView:
struct Node *lowNode=NULL, *hghNode=NULL, *lowNodeNext=NULL, *temp;
BUN idxFirst;
bit copy=TRUE;
+ int i;
- int *p_new = NULL;
- oid *p_new_oid = NULL;
+ int *pl_new = NULL;
+ oid *pl_new_oid = NULL;
+ int *pr_new = NULL;
+ oid *pr_new_oid = NULL;
int *p_data_less = NULL;
int *p_data_greater = NULL;
int nthreads=2;
@@ -486,34 +492,44 @@ createView:
index then we have to crack */
if (foundLow == 0 || foundHgh == 0) {
if (foundLow == 0 && foundHgh == 0) {
- /* We have to do two cracks separatelly */
+ /* We have to do two cracks separatelly */
- /* For the cl bound and the next one in the index*/
- @:crkTwoLTree@5(@1,@5)@
- t = (int *) Tloc(b, BUNfirst(b));
+ /* For the cl bound and the next one in the index*/
+ @:crkTwoLTree@5(@1,@5)@
+ t = (int *) Tloc(b, BUNfirst(b));
- /* For the ch bound and the previous one in the index*/
- @:crkTwoRTree@5(@1,@5)@
+ foundHgh = GetHgh_@1(*hgh, *inclusiveHgh,
CrackerIndex[m].Tree, c, BUNfirst(c), &cl2, &ch2, 0, BUNlast(b)-(oid)1);
+ if (cl2 != 0) cl2++;
+ for(i=0;i<nthreads;i++)
+ {
+ p_data_less[i]=0;
+ p_data_greater[i]=0;
+ }
- if (IndexSize < IndexStop) {
- if (vl > 0)
- _vl = vl - 1;
- else
- _vl = vl;
- if (gapL>0)
- {
- addCrackerIndex_@1(m, low,
*inclusiveLow, _vl, c);
- pieces = pieces + 1;
- }
- if (gapH>0)
- {
- addCrackerIndex_@1(m, hgh, HBound, vh,
c);
- pieces = pieces + 1;
- }
- if ((vl == 1) && (*t == *low) && (*inclusiveLow
== TRUE))
- vl = vl - 1;
- }
- } else if (foundLow == 0) {
+
+ /* For the ch bound and the previous one in the index*/
+ @:crkTwoRTree@5(@1,@5)@
+
+ if (IndexSize < IndexStop) {
+ if (vl > 0)
+ _vl = vl - 1;
+ else
+ _vl = vl;
+ if (gapL>0)
+ {
+ addCrackerIndex_@1(m, low,
*inclusiveLow, _vl, c);
+ pieces = pieces + 1;
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list