2nd question is as follows:
(2) The result of persistent communication after taking checkpoint is incorrect.
In persistent communication,
after MPI_Start is executed by the sender and before MPI_Start is executed by
the receiver,
when the checkpoint is taken, wrong data is received.
Framework : crcp
Component : bkmrk
The source file : ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c
The function name : drain_message_copy_remove_persistent
Here's the code that causes the problem:
#define BLOCKNUM 1
#define SLPTIM 60
if (rank == 0) {
MPI_Send_init(&buf[0],BLOCKNUM,MPI_INT,1,100,MPI_COMM_WORLD,&req1);
for (i=0;i<BLOCKNUM;i++) { buf[i] = (1000+i); }
MPI_Start(&req1);
printf(" rank=%d sleep1 start \n",rank); fflush(stdout);
sleep(SLPTIM); /** take checkpoint at this point **/
printf(" rank=%d sleep1 end \n",rank); fflush(stdout);
MPI_Wait(&req1,&sts1);
MPI_Request_free(&req1);
} else { /* rank 1 */
MPI_Recv_init(&buf[0],BLOCKNUM,MPI_INT,0,100,MPI_COMM_WORLD,&req1);
printf(" rank=%d sleep1 start \n",rank); fflush(stdout);
sleep(SLPTIM); /** take checkpoint at this point **/
printf(" rank=%d sleep1 end \n",rank); fflush(stdout);
for (i=0;i<BLOCKNUM;i++) { buf[i] = 0; }
MPI_Start(&req1); MPI_Wait(&req1,&sts1);
printf(" rank=%d pass-3 %d \n",rank,buf[0]); fflush(stdout);
for (i=0;i<BLOCKNUM;i++) { if(buf[i] != (1000+i)) {
MPI_Abort(MPI_COMM_WORLD,1); } } /* The error occurs at this point */
MPI_Request_free(&req1);
}
* Take checkpoint while Process 0 and Process 1 are in sleep function
* The received data is not equal to the sent data.
Therefore, MPI_Abort is executed.
* I think that the wrong datatype(ompi_datatype_t) is specified as the first
argument of
ompi_ddt_copy_content_same_ddt function in
drain_message_copy_remove_persistent function.
* Because it is the datatype used at the temporary-receive done by the
checkpoint action,
and the datatype is BYTE(MPI_BYTE) type.
* I think that the datatype to be used should be what the user specified thru
"ompi_request_t *request".
In ompi_crcp_bkmrk_pml_start_drain_irecv_init function,
It seem to be referred as follows:
static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t
**request, bool *found_drain)
breq = (mca_pml_base_request_t *)(*request);
tmp_ddt_size = (breq->req_datatype)->size;
-bash-3.2$ cat t_mpi_question-2.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "mpi.h"
#define BLOCKNUM 1
#define SLPTIM 60
int main(int ac,char **av)
{
int i;
int rank,size;
int *buf;
MPI_Status sts1;
MPI_Request req1;
MPI_Init(&ac,&av);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&size);
if (size != 2) { MPI_Abort(MPI_COMM_WORLD,-1); }
buf = (int *)malloc(sizeof(int)*BLOCKNUM);
if (buf == NULL) { MPI_Abort(MPI_COMM_WORLD,-1); }
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) {
MPI_Send_init(&buf[0],BLOCKNUM,MPI_INT,1,100,MPI_COMM_WORLD,&req1);
for (i=0;i<BLOCKNUM;i++) { buf[i] = (1000+i); }
MPI_Start(&req1);
printf(" rank=%d sleep1 start \n",rank); fflush(stdout);
sleep(SLPTIM);
printf(" rank=%d sleep1 end \n",rank); fflush(stdout);
MPI_Wait(&req1,&sts1);
MPI_Request_free(&req1);
} else {
MPI_Recv_init(&buf[0],BLOCKNUM,MPI_INT,0,100,MPI_COMM_WORLD,&req1);
printf(" rank=%d sleep1 start \n",rank); fflush(stdout);
sleep(SLPTIM);
printf(" rank=%d sleep1 end \n",rank); fflush(stdout);
for (i=0;i<BLOCKNUM;i++) { buf[i] = 0; }
MPI_Start(&req1); MPI_Wait(&req1,&sts1);
printf(" rank=%d pass-3 %d \n",rank,buf[0]); fflush(stdout);
for (i=0;i<BLOCKNUM;i++) { if(buf[i] != (1000+i)) {
MPI_Abort(MPI_COMM_WORLD,1); } }
MPI_Request_free(&req1);
}
MPI_Barrier(MPI_COMM_WORLD);
free(buf);
MPI_Finalize();
if (rank == 0) {
printf(" rank=%d Program End \n",rank); fflush(stdout);
}
return(0);
}