Hi, I initiated this thread on the LAM mailing list, but as Brian Barrett suggested, I am now reporting my results here. Hope this is of interest!
My motivation is to get good performance of the MPI_Alltoall routine on our Gigabit Ethernet clusters. Most users here run the GROMACS molecular dynamics code and it turned out that the MPI_Alltoall routine is one of the main scaling bottlenecks (on Ethernet), at least when flow control is not enabled. Then basically the code does not scale beyond two computer nodes. I wrote a test program that performs MPI_Alltoall communication for varying message sizes (see attachment) to find out where congestion occurs. This I originally tested for LAM 7.1.1 and you can see my results in LAM.ps. The plots show the transmission rate per MPI process as a function of the message size. Left: 1 CPU-nodes, right 2 CPU-nodes, top: without flow control, middle: with flow control. (Horizontal broken line indicates Ethernet max. throughput) Because even with flow control you get congestion for 16+ nodes, I tried out ordered communication schemes, which should in principle totally avoid congestion. The result for a simple 1-CPU-node Sendrecv-based scheme is seen in the lower left plot, the result for a more complex multi-CPU node Isend/Irecv scheme in the lower right plot. Hardware flow control was disabled for the ordered communication schemes. Nice thing is, there is no congestion any more, but due to the barriers in the code, the transmission rates you can reach with the ordered all-to-alls is lower than with the MPI_Alltoall when there is no congestion. When the MPI_Alltoall shows congestion, the ordered all-to-all clearly wins. As Brian suggested, I repeated my tests with OpenMPI 1.0, see OpenMPI.ps. The OpenMPI MPI_Alltoall shows less congestion than the LAM MPI_Alltoall, both with and without flow control. Unfortunately the ordered routines even perform worse than in the LAM case. Here are some numbers for the transmission rate T per CPU: Table 1. Message size=2 MB, 4 CPUs "limit for large messages": CPUs/node LAM OpenMPI MPI_Alltoall own all-to-all MPI_Alltoall own all-to-all (flow contr) (no flow c.) (flow contr) (no flow c.) =========================================================================== 1 68.2 MB/s 69.6 MB/s 64.0 MB/s 64.0 MB/s 2 48.9 MB/s 45.3 MB/s 48.7 MB/s 37.4 MB/s Table 2. Message size=1024 byte, 32 CPUs. (Note: here the MPI_Alltoall does not show congestion when flow control is enabled): CPUs/node LAM OpenMPI MPI_Alltoall own all-to-all MPI_Alltoall own all-to-all (flow contr) (no flow c.) (flow contr) (no flow c.) =========================================================================== 1 20.9 MB/s 2.8 MB/s 18.2 MB/s 1.3 MB/s 2 14.3 MB/s 5.0 MB/s 14.3 MB/s 2.7 MB/s While for large messages the transmission rates between the MPI and the ordered all-to-alls become comparable, for smaller message sizes the ordered routines perform worse, due to the barriers between the communication phases. However, I do not understand, why the transmission rate of the ordered all-to-alls does so heavily depend upon the number of CPUs in the OpenMPI case, but not in the LAM case. Are maybe the barrier synchronization times in OpenMPI longer than in LAM? Any ideas, how to rise the throughput without risking congestion, are welcome! Regards, Carsten --------------------------------------------------- Dr. Carsten Kutzner Max Planck Institute for Biophysical Chemistry Theoretical and Computational Biophysics Department Am Fassberg 11 37077 Goettingen, Germany Tel. +49-551-2012313, Fax: +49-551-2012302 eMail ckut...@gwdg.de http://www.gwdg.de/~ckutzne ---------- Forwarded message ---------- List-Post: devel@lists.open-mpi.org Date: Wed, 2 Nov 2005 08:23:15 -0500 From: Brian Barrett <brbar...@lam-mpi.org> Reply-To: General LAM/MPI mailing list <l...@lam-mpi.org> To: General LAM/MPI mailing list <l...@lam-mpi.org> Subject: Re: LAM: MPI_Alltoall performance and congestion On Nov 2, 2005, at 7:44 AM, Carsten Kutzner wrote: > Will a new all-to-all routine be implemented in a future version > of LAM / OpenMPI? I am willing to contribute my code as well > if there is interest. We will probably not be doing any more work on LAM/MPI's collective routines. While clearly not optimal for all cases (as you have experienced), they do appear to be correct. At this point, we're hesitant to do anything to regress from a correctness standpoint. However, we are actively working on improving collective performance in Open MPI. Our collective setup in Open MPI is a bit different than the one in LAM, and some of the algorithms are already quite different. The FT-MPI team from University of Tennessee is also working on some new routines that should give better performance in a wider range of scenarios. We would be happy to have contributions that help improve performance in certain situations - if nothing else, it gives us a good reference point for our work. If you are interested, I would highly recommend trying out one of the Open MPI release candidates, subscribing to the Open MPI developer's mailing list, and letting us know. The Open MPI web page is, of course, http://www.open-mpi.org/ Thanks, Brian -- Brian Barrett LAM/MPI developer and all around nice guy Have a LAM/MPI day: http://www.lam-mpi.org/ _______________________________________________ This list is archived at http://www.lam-mpi.org/MailArchives/lam/
#include <stdlib.h> #include <stdio.h> #include <mpi.h> #include <math.h> /* #define USE_MPE */ #ifdef USE_MPE #include "/home/ckutzne/mpe_for_lam711/include/mpe.h" #define GMX_MPE_LOG(event) MPE_Log_event(event, 0, "") #else #define GMX_MPE_LOG(event) #endif /* Max number of floats to send to each process */ /* 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 */ /* 1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 131072 262144 524288 */ #define TOTAL 524288 #define CATEGORIES 20 #define BASE 2 #define OFFSET 0 /* Global variables */ int nprocs, procid; float rdata[32*TOTAL], sdata[32*TOTAL]; /* test 32 CPUs max. */ int ev_mpi_getinfo_start, ev_mpi_getinfo_finish; void initialize_data(float *d, float *r, int elements, int procs, int procid) /* initialize data with test numbers */ { int i; for (i=0; i<elements; i++) { d[i] = i + procid/10.; /* data to send */ r[i] = 0.0; /* for the results */ } } void print_data(float *d, int elements, int procs, int procid, char *s) /* prints the test data for each proc in an ordered fashion */ { int i,j; if (procid == 0) printf("\n%s\n",s); for (j=0; j<procs; j++) { if (procid == j) { printf("proc %d (%d floats): ",procid,elements); for (i=0; i<elements; i++) printf("%4.1f ",d[i]); printf("\n"); } MPI_Barrier(MPI_COMM_WORLD); } } /* An own version of all-to-all communication that * avoids network congestion */ void GMX_Alltoall(float *sendbuf, int sendcount, MPI_Datatype sendtype, float *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { MPI_Status status; int ncpu, cpuid, source, dest; int i; /* find out how many procs we have in our comm */ MPI_Comm_size(comm,&ncpu); /* I am node number ... */ MPI_Comm_rank(comm,&cpuid); for (i=0; i<ncpu; i++) { /* send to dest = (cpuid + i) */ dest = (cpuid + i) % ncpu; /* receive from source = (cpuid - i) */ source = (ncpu + cpuid - i) % ncpu; MPI_Sendrecv(sendbuf+dest *sendcount, sendcount, sendtype, dest , 0, recvbuf+source*recvcount, recvcount, recvtype, source, 0, comm, &status); /* if (sendcount > 1024*(8/nprocs)) */ /* if ((i<ncpu-1) && (sendcount*sizeof(sendtype) > 256)) */ if (i<ncpu-1) MPI_Barrier(comm); } } void GMX_Alltoall_m(float *sendbuf, int sendcount, MPI_Datatype sendtype, float *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { const int procs_pn = 2; MPI_Request sendrequests[procs_pn], recvrequests[procs_pn]; MPI_Status sendstatuses[procs_pn], recvstatuses[procs_pn]; int ncpus , cpuid , destcpu , sourcecpu ; int nnodes, nodeid, destnode, sourcenode; int i,j; /* find out how many procs we have in our comm */ MPI_Comm_size(comm,&ncpus); /* I am node number ... */ MPI_Comm_rank(comm,&cpuid); /* number of nodes */ nnodes = ncpus / procs_pn; /* my nodeid */ nodeid = cpuid / procs_pn; /* loop over nodes */ for (i=0; i<nnodes; i++) { destnode = ( nodeid + i) % nnodes; /* send to destination node */ sourcenode = (nnodes + nodeid - i) % nnodes; /* receive from source node */ for (j=0; j<procs_pn; j++) { sourcecpu = sourcenode*procs_pn + j; /* source of data */ destcpu = destnode *procs_pn + j; /* destination of data */ MPI_Irecv(recvbuf + sourcecpu*recvcount, recvcount, recvtype, sourcecpu, 0, comm, &recvrequests[j]); MPI_Isend(sendbuf + destcpu *sendcount, sendcount, sendtype, destcpu , 0, comm, &sendrequests[j]); } MPI_Waitall(procs_pn,sendrequests,sendstatuses); MPI_Waitall(procs_pn,recvrequests,recvstatuses); if (i<nnodes-1) MPI_Barrier(comm); } } double do_test(int n, int iterations, int ev_start, int ev_finish, int comm_type) { int i,procs_pn,nprocs,nnodes,myrank; double t_start, t_finish; char *t_string=" "; static int bFirst = 1; /* processors per node */ procs_pn = 2; /* number of processors */ MPI_Comm_size(MPI_COMM_WORLD,&nprocs); /* my rank */ MPI_Comm_rank(MPI_COMM_WORLD,&myrank); /* number of nodes */ nnodes = nprocs / procs_pn; if (bFirst == 1) { bFirst = 0; initialize_data(sdata, rdata, nprocs*TOTAL, nprocs, myrank); } /* print_data(sdata, nprocs*TOTAL, nprocs, myrank,"Data initialized as"); */ if (procid == 0) { if (comm_type==0) t_string="MPI"; else t_string="GMX"; printf("%s: sending %7d floats (%8d bytes) to %d processes (%7d times) took ... ", t_string,n,n*sizeof(float),nprocs,iterations); fflush(NULL); } MPI_Barrier(MPI_COMM_WORLD); GMX_MPE_LOG(ev_start); t_start = MPI_Wtime(); for (i=0; i<iterations; i++) { if (comm_type==0) MPI_Alltoall(sdata,n,MPI_FLOAT,rdata,n,MPI_FLOAT,MPI_COMM_WORLD); else GMX_Alltoall_m(sdata,n,MPI_FLOAT,rdata,n,MPI_FLOAT,MPI_COMM_WORLD); } MPI_Barrier(MPI_COMM_WORLD); GMX_MPE_LOG(ev_finish); t_finish = MPI_Wtime(); if (procid == 0) printf("%10.5f seconds in total\n",t_finish-t_start); /* print_data(rdata, nprocs*TOTAL, nprocs, procid,"Data after alltoall:"); */ return t_finish-t_start; } int main(int argc, char *argv[]) { int i,j,commtype; double t_res; /* timer resolution [seconds] */ const int tests=25; /* number of tests to average over */ double dummy, dum1; double sdev[CATEGORIES]; /* standard deviation for each category */ double t_comm[CATEGORIES][tests]; /* individual time for each test */ double t_aver[CATEGORIES]; /* average over number of tests */ double t_min[CATEGORIES], t_max[CATEGORIES]; /* shortest and longest time a test takes */ /* output results in a file: */ FILE *f_ptr; char filename[11]; /* MPE events */ int ev_start[CATEGORIES], ev_finish[CATEGORIES]; int howmanyfloats[CATEGORIES]; #ifdef USE_MPE char *color[20] = { "orange1 ", "orange2 ", "orange3 ", "orange4 ", \ "red1 ", "red2 ", "red3 ", "red4 ", \ "magenta1", "magenta2", "magenta3", "magenta4", \ "blue1 ", "blue2" , "blue3" , "blue4" , \ "green1 ", "green2" , "green3" , "green4" , \ }; char desc[] = "123456789012"; #endif /* initialize parallel universe */ MPI_Init(&argc,&argv); MPI_Comm_size(MPI_COMM_WORLD, &nprocs); MPI_Comm_rank(MPI_COMM_WORLD, &procid); #ifdef USE_MPE MPE_Init_log(); /* Describe MPE states */ for (i=0; i<CATEGORIES; i++) { ev_start[i] = MPE_Log_get_event_number(); ev_finish[i] = MPE_Log_get_event_number(); sprintf(desc, "Cat %8d", i); MPE_Describe_state(ev_start[i], ev_finish[i], desc, color[i]); } ev_mpi_getinfo_start = MPE_Log_get_event_number(); ev_mpi_getinfo_finish= MPE_Log_get_event_number(); MPE_Describe_state(ev_mpi_getinfo_start, ev_mpi_getinfo_finish, "get comm info", "grey50"); #endif t_res = MPI_Wtick(); /* loop over both communication schemes */ for (commtype = 0; commtype <=1; commtype ++) { if (procid == 0) printf("\nAlltoall Test on %d CPUs. %d repetitions.\n",nprocs,tests); /* Do the tests! */ for (i=OFFSET; i<CATEGORIES; i++) { howmanyfloats[i] = pow(BASE,i); t_aver[i] = 0.0; for (j=0; j<tests; j++) { t_comm[i][j] = do_test(howmanyfloats[i],1,ev_start[i],ev_finish[i],commtype); t_aver[i] += t_comm[i][j]; if (j==0) { t_min[i] = t_comm[i][j]; t_max[i] = t_comm[i][j]; } else { /* minimum time of a single test */ if (t_comm[i][j] < t_min[i]) t_min[i] = t_comm[i][j]; /* maximum time of a single test */ if (t_comm[i][j] > t_max[i]) t_max[i] = t_comm[i][j]; } } t_aver[i] /= tests; } if (procid == 0) { printf("Summary (%d-run average, timer resolution %f):\n",tests,t_res); if (commtype==0) sprintf(filename,"MPI_%2d_%2.2d",tests,nprocs); else sprintf(filename,"GMX_%2d_%2.2d",tests,nprocs); f_ptr=(FILE *)fopen(filename,"w"); for (i=OFFSET; i<CATEGORIES; i++) { /* calc stdandard deviation for each category: */ dummy=0.0; for (j=0; j<tests; j++) { dum1 = t_comm[i][j] - t_aver[i]; dummy += dum1*dum1; } dummy /= tests-1; sdev[i] = sqrt(dummy); printf("%10d floats took %f (%f) seconds. Min: %f max: %f\n",howmanyfloats[i],t_aver[i],sdev[i],t_min[i],t_max[i]); fprintf(f_ptr,"%4d %10d %f %f %f %f\n", nprocs,howmanyfloats[i],t_aver[i],sdev[i],t_min[i],t_max[i]); } } } /* end of loop over communication schemes */ MPI_Finalize(); return 0; }
LAM.ps
Description: PostScript document
OpenMPI.ps
Description: PostScript document