Author: bosilca
Date: 2009-06-26 16:32:31 EDT (Fri, 26 Jun 2009)
New Revision: 21543
URL: https://svn.open-mpi.org/trac/ompi/changeset/21543
Log:
BAD grpcomm now has the ability to execute the modex offline. The
MPI process
prepare the send buffer, and post the collective order to the local
daemon. It
then register the callback and return fromthe modex exchange. It
will only
wait for this modex completion when the modex_recv is called.
Meanwhile, the
daemon will do the allgather.
Text files modified:
trunk/orte/mca/grpcomm/bad/grpcomm_bad_module.c | 110 +++++++++++
++++++++++++++++++++++++++++
1 files changed, 108 insertions(+), 2 deletions(-)
Modified: trunk/orte/mca/grpcomm/bad/grpcomm_bad_module.c
=
=
=
=
=
=
=
=
======================================================================
--- trunk/orte/mca/grpcomm/bad/grpcomm_bad_module.c (original)
+++ trunk/orte/mca/grpcomm/bad/grpcomm_bad_module.c 2009-06-26
16:32:31 EDT (Fri, 26 Jun 2009)
@@ -52,6 +52,9 @@
static int barrier(void);
static int onesided_barrier(void);
static int modex(opal_list_t *procs);
+static int orte_grpcomm_bad_get_proc_attr(const orte_process_name_t
proc,
+ const char *
attribute_name, void **val,
+ size_t *size);
/* Module def */
orte_grpcomm_base_module_t orte_grpcomm_bad_module = {
@@ -63,7 +66,7 @@
barrier,
onesided_barrier,
orte_grpcomm_base_set_proc_attr,
- orte_grpcomm_base_get_proc_attr,
+ orte_grpcomm_bad_get_proc_attr,
modex,
orte_grpcomm_base_purge_proc_attrs
};
@@ -488,8 +491,37 @@
return ORTE_SUCCESS;
}
+static int
+orte_grpcomm_bad_get_proc_attr(const orte_process_name_t proc,
+ const char * attribute_name, void
**val,
+ size_t *size)
+{
+ if( false == allgather_complete ) {
+ ORTE_PROGRESSED_WAIT(allgather_complete, 0, 1);
+ }
+ return orte_grpcomm_base_get_proc_attr(proc, attribute_name,
val, size);
+}
/*** MODEX SECTION ***/
+static void allgather_recv_modex(int status, orte_process_name_t*
sender,
+ opal_buffer_t *buffer,
+ orte_rml_tag_t tag, void *cbdata)
+{
+ opal_buffer_t *allgather_buf = (opal_buffer_t*)cbdata;
+ int rc;
+
+ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
+ "%s grpcomm:bad modex received",
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
+
+ if( ORTE_SUCCESS != (rc =
orte_grpcomm_base_modex_unpack(buffer, true)) ) {
+ ORTE_ERROR_LOG(rc);
+ }
+ OBJ_RELEASE(allgather_buf);
+
+ allgather_complete = true;
+}
+
static int modex(opal_list_t *procs)
{
int rc;
@@ -499,9 +531,83 @@
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
if (NULL == procs) {
- if (ORTE_SUCCESS != (rc =
orte_grpcomm_base_peer_modex(true))) {
+ /* The modex will be realized in the background by the
daemons. The processes will
+ * only be informed when all data has been collected from
all processes. The get_attr
+ * will realize the blocking, it will not return until the
data has been rteceived.
+ */
+ opal_buffer_t *buf, *rbuf;
+ orte_grpcomm_coll_t coll_type = ORTE_GRPCOMM_ALLGATHER;
+ bool modex_reqd = true;
+
+ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
+ "%s grpcomm:bad:peer:modex: performing
modex",
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
+
+ /* setup the buffer that will actually be sent */
+ buf = OBJ_NEW(opal_buffer_t);
+ rbuf = OBJ_NEW(opal_buffer_t);
+
+ /* tell the daemon we are doing an allgather */
+ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf, &coll_type, 1,
ORTE_GRPCOMM_COLL_T))) {
+ ORTE_ERROR_LOG(rc);
+ return rc;
+ }
+
+ /* put our process name in the buffer so it can be unpacked
later */
+ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf,
ORTE_PROC_MY_NAME, 1, ORTE_NAME))) {
+ ORTE_ERROR_LOG(rc);
+ goto cleanup;
+ }
+
+ if (ORTE_SUCCESS != (rc = opal_dss.pack(buf,
&orte_process_info.arch, 1, OPAL_UINT32))) {
+ ORTE_ERROR_LOG(rc);
+ goto cleanup;
+ }
+
+ /* pack the entries we have received */
+ if (ORTE_SUCCESS != (rc =
orte_grpcomm_base_pack_modex_entries(buf, &modex_reqd))) {
+ ORTE_ERROR_LOG(rc);
+ goto cleanup;
+ }
+
+ OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
+ "%s grpcomm:bad:peer:modex: executing
non-blocking allgather",
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
+
+ /* send to local daemon */
+ if (0 > (rc = orte_rml.send_buffer(ORTE_PROC_MY_DAEMON,
buf, ORTE_RML_TAG_DAEMON_COLLECTIVE, 0))) {
ORTE_ERROR_LOG(rc);
+ return rc;
}
+
+ OPAL_OUTPUT_VERBOSE((2, orte_grpcomm_base_output,
+ "%s grpcomm:bad allgather buffer sent",
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
+
+ /* now receive the final result. Be sure to do this in
+ * a manner that allows us to return without being in a recv!
+ */
+ allgather_complete = false;
+ rc = orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD,
ORTE_RML_TAG_ALLGATHER,
+ ORTE_RML_NON_PERSISTENT,
allgather_recv_modex, (void*)rbuf);
+ if (rc != ORTE_SUCCESS) {
+ ORTE_ERROR_LOG(rc);
+ return rc;
+ }
+ rbuf = NULL; /* make sure we don't release it yet */
+
+ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_output,
+ "%s grpcomm:bad: modex posted",
+ ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
+ cleanup:
+ if( NULL != buf ) {
+ OBJ_RELEASE(buf);
+ }
+ if( NULL != rbuf ) {
+ OBJ_RELEASE(rbuf);
+ }
+
+ return rc;
} else {
if (ORTE_SUCCESS != (rc =
orte_grpcomm_base_full_modex(procs, true))) {
ORTE_ERROR_LOG(rc);
_______________________________________________
svn mailing list
s...@open-mpi.org
http://www.open-mpi.org/mailman/listinfo.cgi/svn