Hi,

I initiated this thread on the LAM mailing list, but as Brian Barrett
suggested, I am now reporting my results here. Hope this is of interest!

My motivation is to get good performance of the MPI_Alltoall routine on
our Gigabit Ethernet clusters. Most users here run the GROMACS molecular
dynamics code and it turned out that the MPI_Alltoall routine is one
of the main scaling bottlenecks (on Ethernet), at least when flow control
is not enabled. Then basically the code does not scale beyond two computer
nodes.

I wrote a test program that performs MPI_Alltoall communication for
varying message sizes (see attachment) to find out where congestion
occurs. This I originally tested for LAM 7.1.1 and you can see my results
in LAM.ps. The plots show the transmission rate per MPI process as a
function of the message size. Left: 1 CPU-nodes, right 2 CPU-nodes, top:
without flow control, middle: with flow control. (Horizontal broken line
indicates Ethernet max. throughput)

Because even with flow control you get congestion for 16+ nodes, I tried
out ordered communication schemes, which should in principle totally avoid
congestion. The result for a simple 1-CPU-node Sendrecv-based scheme is
seen in the lower left plot, the result for a more complex multi-CPU node
Isend/Irecv scheme in the lower right plot. Hardware flow control was
disabled for the ordered communication schemes. Nice thing is, there is no
congestion any more, but due to the barriers in the code, the transmission
rates you can reach with the ordered all-to-alls is lower than with the
MPI_Alltoall when there is no congestion. When the MPI_Alltoall shows
congestion, the ordered all-to-all clearly wins.

As Brian suggested, I repeated my tests with OpenMPI 1.0, see OpenMPI.ps.
The OpenMPI MPI_Alltoall shows less congestion than the LAM MPI_Alltoall,
both with and without flow control. Unfortunately the ordered routines
even perform worse than in the LAM case.

Here are some numbers for the transmission rate T per CPU:

Table 1. Message size=2 MB, 4 CPUs "limit for large messages":
CPUs/node  LAM                            OpenMPI
           MPI_Alltoall  own all-to-all   MPI_Alltoall  own all-to-all
           (flow contr)  (no flow c.)     (flow contr)  (no flow c.)
===========================================================================
    1      68.2 MB/s     69.6 MB/s        64.0 MB/s     64.0 MB/s
    2      48.9 MB/s     45.3 MB/s        48.7 MB/s     37.4 MB/s



Table 2. Message size=1024 byte, 32 CPUs. (Note: here the MPI_Alltoall
does not show congestion when flow control is enabled):
CPUs/node  LAM                            OpenMPI
           MPI_Alltoall  own all-to-all   MPI_Alltoall  own all-to-all
           (flow contr)  (no flow c.)     (flow contr)  (no flow c.)
===========================================================================
    1      20.9 MB/s      2.8 MB/s        18.2 MB/s      1.3 MB/s
    2      14.3 MB/s      5.0 MB/s        14.3 MB/s      2.7 MB/s

While for large messages the transmission rates between the MPI and the
ordered all-to-alls become comparable, for smaller message sizes the
ordered routines perform worse, due to the barriers between the
communication phases. However, I do not understand, why the transmission
rate of the ordered all-to-alls does so heavily depend upon the number of
CPUs in the OpenMPI case, but not in the LAM case. Are maybe the barrier
synchronization times in OpenMPI longer than in LAM?

Any ideas, how to rise the throughput without risking congestion, are
welcome!

Regards,
   Carsten

---------------------------------------------------
Dr. Carsten Kutzner
Max Planck Institute for Biophysical Chemistry
Theoretical and Computational Biophysics Department
Am Fassberg 11
37077 Goettingen, Germany
Tel. +49-551-2012313, Fax: +49-551-2012302
eMail ckut...@gwdg.de
http://www.gwdg.de/~ckutzne



---------- Forwarded message ----------
List-Post: devel@lists.open-mpi.org
Date: Wed, 2 Nov 2005 08:23:15 -0500
From: Brian Barrett <brbar...@lam-mpi.org>
Reply-To: General LAM/MPI mailing list <l...@lam-mpi.org>
To: General LAM/MPI mailing list <l...@lam-mpi.org>
Subject: Re: LAM: MPI_Alltoall performance and congestion

On Nov 2, 2005, at 7:44 AM, Carsten Kutzner wrote:

> Will a new all-to-all routine be implemented in a future version
> of LAM / OpenMPI? I am willing to contribute my code as well
> if there is interest.

We will probably not be doing any more work on LAM/MPI's collective
routines.  While clearly not optimal for all cases (as you have
experienced), they do appear to be correct.  At this point, we're
hesitant to do anything to regress from a correctness standpoint.

However, we are actively working on improving collective performance
in Open MPI.  Our collective setup in Open MPI is a bit different
than the one in LAM, and some of the algorithms are already quite
different.  The FT-MPI team from University of Tennessee is also
working on some new routines that should give better performance in a
wider range of scenarios.  We would be happy to have contributions
that help improve performance in certain situations - if nothing
else, it gives us a good reference point for our work.  If you are
interested, I would highly recommend trying out one of the Open MPI
release candidates, subscribing to the Open MPI developer's mailing
list, and letting us know.  The Open MPI web page is, of course,
http://www.open-mpi.org/


Thanks,

Brian

-- 
   Brian Barrett
   LAM/MPI developer and all around nice guy
   Have a LAM/MPI day: http://www.lam-mpi.org/


_______________________________________________
This list is archived at http://www.lam-mpi.org/MailArchives/lam/
#include <stdlib.h>
#include <stdio.h>
#include <mpi.h>
#include <math.h>

/* #define USE_MPE */
#ifdef USE_MPE
#include "/home/ckutzne/mpe_for_lam711/include/mpe.h"
#define GMX_MPE_LOG(event) MPE_Log_event(event, 0, "")
#else
#define GMX_MPE_LOG(event)
#endif

/* Max number of floats to send to each process */
/* 1 2 3 4  5  6  7   8   9  10   11   12   13   14    15    16    17     18    
 19     20 */
/* 1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 131072 
262144 524288 */
#define TOTAL 524288
#define CATEGORIES 20
#define BASE 2
#define OFFSET 0

/* Global variables */
int nprocs, procid;
float rdata[32*TOTAL], sdata[32*TOTAL]; /* test 32 CPUs max. */
int ev_mpi_getinfo_start, ev_mpi_getinfo_finish;

void initialize_data(float *d, float *r, int elements, int procs, int procid)
/* initialize data with test numbers */
{
  int i;
  
  for (i=0; i<elements; i++)
  {
    d[i] = i + procid/10.;  /* data to send */
    r[i] = 0.0;             /* for the results */
  }
}

void print_data(float *d, int elements, int procs, int procid, char *s)
/* prints the test data for each proc in an ordered fashion */
{
  int i,j;
  
  if (procid == 0)
    printf("\n%s\n",s);
  for (j=0; j<procs; j++)
  {
    if (procid == j)
    {
      printf("proc %d (%d floats): ",procid,elements);
  
      for (i=0; i<elements; i++)
        printf("%4.1f ",d[i]);
      printf("\n");
    }
    MPI_Barrier(MPI_COMM_WORLD);
  }
}


/* An own version of all-to-all communication that 
 * avoids network congestion */
void GMX_Alltoall(float *sendbuf, int sendcount, MPI_Datatype sendtype,
                  float *recvbuf, int recvcount, MPI_Datatype recvtype,
                  MPI_Comm comm)
{
   MPI_Status status;
   int ncpu, cpuid, source, dest;
   int i;
      
   /* find out how many procs we have in our comm */
   MPI_Comm_size(comm,&ncpu);
   /* I am node number ... */
   MPI_Comm_rank(comm,&cpuid);
         
   for (i=0; i<ncpu; i++)
   {
     /* send to dest = (cpuid + i) */
     dest = (cpuid + i) % ncpu;     
     /* receive from source = (cpuid - i) */
     source   = (ncpu + cpuid - i) % ncpu;
          
     MPI_Sendrecv(sendbuf+dest  *sendcount, sendcount, sendtype, dest  , 0,
                  recvbuf+source*recvcount, recvcount, recvtype, source, 0,
                  comm, &status);
/*     if (sendcount > 1024*(8/nprocs)) */
/*     if ((i<ncpu-1) && (sendcount*sizeof(sendtype) > 256)) */
     if (i<ncpu-1)
        MPI_Barrier(comm);
   }
}

void GMX_Alltoall_m(float *sendbuf, int sendcount, MPI_Datatype sendtype,
                   float *recvbuf, int recvcount, MPI_Datatype recvtype,
                   MPI_Comm comm)
{
   const int procs_pn = 2;
   
   MPI_Request sendrequests[procs_pn], recvrequests[procs_pn];
   MPI_Status  sendstatuses[procs_pn], recvstatuses[procs_pn];
   int ncpus , cpuid , destcpu , sourcecpu ;
   int nnodes, nodeid, destnode, sourcenode;
   int i,j;
   
   /* find out how many procs we have in our comm */
   MPI_Comm_size(comm,&ncpus);
   /* I am node number ... */
   MPI_Comm_rank(comm,&cpuid);
   /* number of nodes */
   nnodes = ncpus / procs_pn;
   /* my nodeid */
   nodeid = cpuid / procs_pn;

   /* loop over nodes */
   for (i=0; i<nnodes; i++)
   {
     destnode   = (         nodeid + i) % nnodes;  /* send to destination node 
*/
     sourcenode = (nnodes + nodeid - i) % nnodes;  /* receive from source node 
*/
     for (j=0; j<procs_pn; j++)
     {
       sourcecpu = sourcenode*procs_pn + j; /* source of data */
       destcpu   = destnode  *procs_pn + j; /* destination of data */    
       MPI_Irecv(recvbuf + sourcecpu*recvcount, recvcount, recvtype, sourcecpu, 
0, comm, &recvrequests[j]);
       MPI_Isend(sendbuf + destcpu  *sendcount, sendcount, sendtype, destcpu  , 
0, comm, &sendrequests[j]);
     }
     MPI_Waitall(procs_pn,sendrequests,sendstatuses);
     MPI_Waitall(procs_pn,recvrequests,recvstatuses);
     if (i<nnodes-1) 
       MPI_Barrier(comm);       
   }
}


double do_test(int n, int iterations, int ev_start, int ev_finish, int 
comm_type)
{
  int i,procs_pn,nprocs,nnodes,myrank;
  double t_start, t_finish;
  char *t_string="   ";
  static int bFirst = 1;

  /* processors per node */
  procs_pn = 2;
  /* number of processors */
  MPI_Comm_size(MPI_COMM_WORLD,&nprocs);
  /* my rank */
  MPI_Comm_rank(MPI_COMM_WORLD,&myrank);
  /* number of nodes */
  nnodes = nprocs / procs_pn;

  if (bFirst == 1)
  {
    bFirst = 0;
    initialize_data(sdata, rdata, nprocs*TOTAL, nprocs, myrank);
  }
   
/*  print_data(sdata, nprocs*TOTAL, nprocs, myrank,"Data initialized as"); */
              
  if (procid == 0)
  {
    if (comm_type==0)
      t_string="MPI";
    else
      t_string="GMX";
      
    printf("%s: sending %7d floats (%8d bytes) to %d processes (%7d times) took 
... ",
           t_string,n,n*sizeof(float),nprocs,iterations);
    fflush(NULL);
  }
  MPI_Barrier(MPI_COMM_WORLD);
  GMX_MPE_LOG(ev_start);
  t_start = MPI_Wtime();

  for (i=0; i<iterations; i++)
  { 
    if (comm_type==0)     
      MPI_Alltoall(sdata,n,MPI_FLOAT,rdata,n,MPI_FLOAT,MPI_COMM_WORLD);
    else
      GMX_Alltoall_m(sdata,n,MPI_FLOAT,rdata,n,MPI_FLOAT,MPI_COMM_WORLD);
  }
  MPI_Barrier(MPI_COMM_WORLD);
  GMX_MPE_LOG(ev_finish);
  t_finish = MPI_Wtime();
  
  if (procid == 0)
    printf("%10.5f seconds in total\n",t_finish-t_start);
 
/*  print_data(rdata, nprocs*TOTAL, nprocs, procid,"Data after alltoall:"); */
  
  return t_finish-t_start;
}


int main(int argc, char *argv[])
{

  int i,j,commtype;
  double t_res; /* timer resolution [seconds] */
  const int tests=25; /* number of tests to average over */
  double dummy, dum1;
  double sdev[CATEGORIES]; /* standard deviation for each category */
  double t_comm[CATEGORIES][tests]; /* individual time for each test */
  double t_aver[CATEGORIES];        /* average over number of tests */
  double t_min[CATEGORIES], t_max[CATEGORIES]; /* shortest and longest time a 
test takes */

  /* output results in a file: */
  FILE *f_ptr;
  char filename[11];

  /* MPE events */
  int ev_start[CATEGORIES], ev_finish[CATEGORIES]; 
  
  int howmanyfloats[CATEGORIES];

#ifdef USE_MPE                     
  char *color[20] = { "orange1 ", "orange2 ", "orange3 ", "orange4 ", \
                      "red1    ", "red2    ", "red3    ", "red4    ", \
                      "magenta1", "magenta2", "magenta3", "magenta4", \
                      "blue1   ", "blue2"   , "blue3"   , "blue4"   , \
                      "green1  ", "green2"  , "green3"  , "green4"  , \
                    };
  char desc[] = "123456789012";                            
#endif

  /* initialize parallel universe */
  MPI_Init(&argc,&argv);
  MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
  MPI_Comm_rank(MPI_COMM_WORLD, &procid);
#ifdef USE_MPE
  MPE_Init_log();
  /* Describe MPE states */
  for (i=0; i<CATEGORIES; i++)
  {
    ev_start[i]  = MPE_Log_get_event_number();
    ev_finish[i] = MPE_Log_get_event_number();
    sprintf(desc, "Cat %8d", i);
    MPE_Describe_state(ev_start[i], ev_finish[i], desc, color[i]);
  }
  ev_mpi_getinfo_start = MPE_Log_get_event_number();
  ev_mpi_getinfo_finish= MPE_Log_get_event_number();
  MPE_Describe_state(ev_mpi_getinfo_start, ev_mpi_getinfo_finish, "get comm 
info", "grey50");
#endif

  t_res = MPI_Wtick();

/* loop over both communication schemes */
for (commtype = 0; commtype <=1; commtype ++) 
{
  if (procid == 0)
    printf("\nAlltoall Test on %d CPUs. %d repetitions.\n",nprocs,tests);

  /* Do the tests! */
  for (i=OFFSET; i<CATEGORIES; i++)
  {
    howmanyfloats[i] = pow(BASE,i);
    t_aver[i] = 0.0;
    
    for (j=0; j<tests; j++)
    {
      t_comm[i][j] = 
do_test(howmanyfloats[i],1,ev_start[i],ev_finish[i],commtype);
      t_aver[i] += t_comm[i][j];
      if (j==0)
      {
        t_min[i] = t_comm[i][j];
        t_max[i] = t_comm[i][j];
      }
      else
      {
        /* minimum time of a single test */
        if (t_comm[i][j] < t_min[i])
          t_min[i] = t_comm[i][j];
        /* maximum time of a single test */
        if (t_comm[i][j] > t_max[i])
          t_max[i] = t_comm[i][j];
      }
    }
    t_aver[i] /= tests;
  }
  
  if (procid == 0)
  {
    printf("Summary (%d-run average, timer resolution %f):\n",tests,t_res);
    
    if (commtype==0)
      sprintf(filename,"MPI_%2d_%2.2d",tests,nprocs);
    else 
      sprintf(filename,"GMX_%2d_%2.2d",tests,nprocs);
      
    f_ptr=(FILE *)fopen(filename,"w");
    
    for (i=OFFSET; i<CATEGORIES; i++)
    {
      /* calc stdandard deviation for each category: */
      dummy=0.0;
      for (j=0; j<tests; j++)
      {
        dum1   = t_comm[i][j] - t_aver[i];
        dummy += dum1*dum1;
      }
      dummy /= tests-1;
      sdev[i] = sqrt(dummy);
      
      printf("%10d floats took %f (%f) seconds. Min: %f  max: 
%f\n",howmanyfloats[i],t_aver[i],sdev[i],t_min[i],t_max[i]);

      fprintf(f_ptr,"%4d  %10d  %f  %f    %f  %f\n",
                     
nprocs,howmanyfloats[i],t_aver[i],sdev[i],t_min[i],t_max[i]);
    }
  }
} /* end of loop over communication schemes */
  
  MPI_Finalize();
  
  return 0;
} 

Attachment: LAM.ps
Description: PostScript document

Attachment: OpenMPI.ps
Description: PostScript document

Reply via email to