Interesting, I wonder how your compiler gets to know the definition of the PMIX_ERR_SILENT without the pmix_common.h. I just pushed a fix.
George. On Wed, Oct 28, 2015 at 12:43 AM, Gilles Gouaillardet <gil...@rist.or.jp> wrote: > George, > > i am unable to reproduce the issue. > if build still breaks for you, could you send me your configure command > line ? > > Cheers, > > Gilles > > > On 10/28/2015 1:04 PM, Gilles Gouaillardet wrote: > > George, > > PMIX_ERR_SILENT is defined in opal/mca/pmix/pmix1xx/pmix/include/pmix/ > pmix_common.h.in > > i ll have a look at it from now > > Cheers, > > Gilles > > On 10/28/2015 12:02 PM, George Bosilca wrote: > > We get a nice compiler complaint: > > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: > In function 'pmix_server_get': > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: > error: 'PMIX_ERR_SILENT' undeclared (first use in this function) > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: > error: (Each undeclared identifier is reported only once > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: > error: for each function it appears in.) > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:117: > warning: unused variable 'cd' > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: > In function '_process_dmdx_reply': > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:478: > error: 'PMIX_ERR_SILENT' undeclared (first use in this function) > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: > warning: unused variable 'xptr' > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: > warning: unused variable 'pbkt' > ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: > warning: unused variable 'xfer' > > And he is right: > > $find . -name "*.h" -exec grep -Hn PMIX_ERR_SILENT {} + > ./opal/mca/pmix/pmix1xx/pmix/src/util/error.h:33: if > (PMIX_ERR_SILENT != (r)) { \ > > George. > > ---------- Forwarded message ---------- > From: <git...@crest.iu.edu> > Date: Tue, Oct 27, 2015 at 10:39 PM > Subject: [OMPI commits] Git: open-mpi/ompi branch master updated. > dev-2921-gb603307 > To: ompi-comm...@open-mpi.org > > > This is an automated email from the git hooks/post-receive script. It was > generated because a ref change was pushed to the repository containing > the project "open-mpi/ompi". > > The branch, master has been updated > via b603307f7d33663ef6fe5941bb0d94bd2be017cb (commit) > via 267ca8fcd3a59b780491d80d29e870061d8dac56 (commit) > from 3035e140511b082c51ad66e116dd381a083a191d (commit) > > Those revisions listed above that are new to this repository have > not appeared on any other notification email; so we list those > revisions in full, below. > > - Log ----------------------------------------------------------------- > > https://github.com/open-mpi/ompi/commit/b603307f7d33663ef6fe5941bb0d94bd2be017cb > > commit b603307f7d33663ef6fe5941bb0d94bd2be017cb > Merge: 3035e14 267ca8f > Author: rhc54 <r...@open-mpi.org> > Date: Tue Oct 27 19:39:10 2015 -0700 > > Merge pull request #1073 from rhc54/topic/pmix > > Cleanup the PMIx direct modex support. > > > > > https://github.com/open-mpi/ompi/commit/267ca8fcd3a59b780491d80d29e870061d8dac56 > > commit 267ca8fcd3a59b780491d80d29e870061d8dac56 > Author: Ralph Castain <r...@open-mpi.org> > Date: Tue Oct 27 11:01:49 2015 -0700 > > Cleanup the PMIx direct modex support. Add an MCA parameter > pmix_base_async_modex that will cause the async modex to be used when set > to 1. Default it to 0 for now > to continue current default behavior. > > Also add an MCA param pmix_base_collect_data to direct that the > blocking fence shall return all data to each process. Obviously, this param > has no effect if async_ > modex is used. > > diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c > index 4c0391d..d0eebb2 100644 > --- a/ompi/runtime/ompi_mpi_init.c > +++ b/ompi/runtime/ompi_mpi_init.c > @@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int > requested, int *provided) > > /* exchange connection info - this function may also act as a barrier > * if data exchange is required. The modex occurs solely across procs > - * in our job, so no proc array is passed. If a barrier is required, > - * the "modex" function will perform it internally > - */ > - OPAL_MODEX(NULL, 1); > + * in our job. If a barrier is required, the "modex" function will > + * perform it internally */ > + OPAL_MODEX(); > > OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier")); > > diff --git a/opal/mca/pmix/base/pmix_base_frame.c > b/opal/mca/pmix/base/pmix_base_frame.c > index e1ab766..6e8a347 100644 > --- a/opal/mca/pmix/base/pmix_base_frame.c > +++ b/opal/mca/pmix/base/pmix_base_frame.c > @@ -31,12 +31,21 @@ > /* Note that this initializer is important -- do not remove it! See > https://github.com/open-mpi/ompi/issues/375 for details. */ > opal_pmix_base_module_t opal_pmix = { 0 }; > -bool opal_pmix_collect_all_data = false; > +bool opal_pmix_collect_all_data = true; > bool opal_pmix_base_allow_delayed_server = false; > int opal_pmix_verbose_output = -1; > +bool opal_pmix_base_async_modex = false; > > static int opal_pmix_base_frame_register(mca_base_register_flag_t flags) > { > + opal_pmix_base_async_modex = false; > + (void) mca_base_var_register("opal", "pmix", "base", "async_modex", > "Use asynchronous modex mode", > + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, > OPAL_INFO_LVL_9, > + MCA_BASE_VAR_SCOPE_READONLY, > &opal_pmix_base_async_modex); > + opal_pmix_collect_all_data = true; > + (void) mca_base_var_register("opal", "pmix", "base", "collect_data", > "Collect all data during modex", > + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, > OPAL_INFO_LVL_9, > + MCA_BASE_VAR_SCOPE_READONLY, > &opal_pmix_collect_all_data); > return OPAL_SUCCESS; > } > > diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h > index f265e01..7223529 100644 > --- a/opal/mca/pmix/pmix.h > +++ b/opal/mca/pmix/pmix.h > @@ -36,6 +36,8 @@ BEGIN_C_DECLS > /* provide access to the framework verbose output without > * exposing the entire base */ > extern int opal_pmix_verbose_output; > +extern bool opal_pmix_collect_all_data; > +extern bool opal_pmix_base_async_modex; > extern int opal_pmix_base_exchange(opal_value_t *info, > opal_pmix_pdata_t *pdat, > int timeout); > @@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t > *info, > * that takes into account directives and availability of > * non-blocking operations > */ > -#define OPAL_MODEX(p, s) \ > - do { \ > - opal_pmix.commit(); \ > - opal_pmix.fence((p), (s)); \ > +#define OPAL_MODEX() \ > + do { \ > + opal_pmix.commit(); \ > + if (!opal_pmix_base_async_modex) { \ > + opal_pmix.fence(NULL, \ > + opal_pmix_collect_all_data); \ > + } \ > } while(0); > > /** > diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in > b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in > index 0216e34..5a111a1 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in > +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in > @@ -183,7 +183,7 @@ BEGIN_C_DECLS > > /**** PMIX ERROR CONSTANTS ****/ > /* PMIx errors are always negative, with 0 reserved for success */ > -#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries > in enum > +#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries > in enum > > typedef enum { > PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN, > @@ -230,6 +230,7 @@ typedef enum { > PMIX_ERR_INVALID_CRED, > PMIX_EXISTS, > > + PMIX_ERR_SILENT, > PMIX_ERROR, > PMIX_SUCCESS > } pmix_status_t; > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c > b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c > index d41be9c..b93ca6d 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c > +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c > @@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, > pmix_usock_hdr_t *hdr, > PMIX_RELEASE(bptr); // free's the data region > if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { > PMIX_ERROR_LOG(rc); > + rc = PMIX_ERR_SILENT; // avoid error-logging twice > break; > } > } > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am > b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am > index 5422b78..88b0468 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am > +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am > @@ -16,4 +16,5 @@ sources += \ > src/server/pmix_server.c \ > src/server/pmix_server_ops.c \ > src/server/pmix_server_regex.c \ > - src/server/pmix_server_listener.c > + src/server/pmix_server_listener.c \ > + src/server/pmix_server_get.c > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c > b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c > index d16ae16..85f9e17 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c > +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c > @@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void > *cbdata) > pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata; > pmix_usock_send_t *snd; > pmix_output_verbose(2, pmix_globals.debug_output, > - "[%s:%d] queue callback called: reply to %s:%d on > tag %d," > - "event_is_active=%d", > + "[%s:%d] queue callback called: reply to %s:%d on > tag %d", > __FILE__, __LINE__, > (queue->peer)->info->nptr->nspace, > (queue->peer)->info->rank, (queue->tag), > @@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, void > *cbdata) > queue->buf = (b); \ > queue->tag = (t); \ > pmix_output_verbose(2, pmix_globals.debug_output, \ > - "[%s:%d] queue reply to %s:%d on tag %d," \ > - "event_is_active=%d", \ > + "[%s:%d] queue reply to %s:%d on tag %d", \ > __FILE__, __LINE__, \ > (queue->peer)->info->nptr->nspace, \ > - (queue->peer)->info->rank, (queue->tag), \ > - (queue->peer)->send_ev_active); \ > + (queue->peer)->info->rank, (queue->tag)); \ > event_assign(&queue->ev, pmix_globals.evbase, -1, \ > EV_WRITE, _queue_message, queue); \ > event_priority_set(&queue->ev, 0); \ > @@ -723,7 +720,7 @@ static void _register_client(int sd, short args, void > *cbdata) > * someone has been waiting for a request on a remote proc > * in one of our nspaces, but we didn't know all the local procs > * and so couldn't determine the proc was remote */ > - pmix_pending_nspace_fix(nptr); > + pmix_pending_nspace_requests(nptr); > } > /* let the caller know we are done */ > if (NULL != cd->opcbfunc) { > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c > b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c > new file mode 100644 > index 0000000..2cb75cf > --- /dev/null > +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c > @@ -0,0 +1,552 @@ > +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ > +/* > + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. > + * Copyright (c) 2014-2015 Research Organization for Information Science > + * and Technology (RIST). All rights reserved. > + * Copyright (c) 2014-2015 Artem Y. Polyakov < <artpo...@gmail.com> > artpo...@gmail.com>. > + * All rights reserved. > + * Copyright (c) 2015 Mellanox Technologies, Inc. > + * All rights reserved. > + * $COPYRIGHT$ > + * > + * Additional copyrights may follow > + * > + * $HEADER$ > + */ > + > +#include <private/autogen/config.h> > +#include <pmix/rename.h> > +#include <private/types.h> > +#include <private/pmix_stdint.h> > +#include <private/pmix_socket_errno.h> > + > +#include <pmix_server.h> > +#include "src/include/pmix_globals.h" > + > +#ifdef HAVE_STRING_H > +#include <string.h> > +#endif > +#include <fcntl.h> > +#ifdef HAVE_UNISTD_H > +#include <unistd.h> > +#endif > +#ifdef HAVE_SYS_SOCKET_H > +#include <sys/socket.h> > +#endif > +#ifdef HAVE_SYS_UN_H > +#include <sys/un.h> > +#endif > +#ifdef HAVE_SYS_UIO_H > +#include <sys/uio.h> > +#endif > +#ifdef HAVE_SYS_TYPES_H > +#include <sys/types.h> > +#endif > +#include PMIX_EVENT_HEADER > + > +#include "src/class/pmix_list.h" > +#include "src/buffer_ops/buffer_ops.h" > +#include "src/util/argv.h" > +#include "src/util/error.h" > +#include "src/util/output.h" > +#include "src/util/pmix_environ.h" > +#include "src/util/progress_threads.h" > +#include "src/usock/usock.h" > +#include "src/sec/pmix_sec.h" > + > +#include "pmix_server_ops.h" > + > +extern pmix_server_module_t pmix_host_server; > + > +typedef struct { > + pmix_object_t super; > + pmix_event_t ev; > + pmix_status_t status; > + const char *data; > + size_t ndata; > + pmix_dmdx_local_t *lcd; > + pmix_release_cbfunc_t relcbfunc; > + void *cbdata; > +} pmix_dmdx_reply_caddy_t; > +static void dcd_con(pmix_dmdx_reply_caddy_t *p) > +{ > + p->status = PMIX_ERROR; > + p->ndata = 0; > + p->lcd = NULL; > + p->relcbfunc = NULL; > + p->cbdata = NULL; > +} > +PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, > + pmix_object_t, dcd_con, NULL); > + > + > +static void dmdx_cbfunc(pmix_status_t status, const char *data, > + size_t ndata, void *cbdata, > + pmix_release_cbfunc_t relfn, void *relcbdata); > +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, > + pmix_modex_cbfunc_t cbfunc, void > *cbdata); > +static pmix_status_t create_local_tracker(char nspace[], int rank, > + pmix_info_t info[], size_t > ninfo, > + pmix_modex_cbfunc_t cbfunc, > + void *cbdata, > + pmix_dmdx_local_t **lcd); > + > + > +/* declare a function whose sole purpose is to > + * free data that we provided to our host server > + * when servicing dmodex requests */ > +static void relfn(void *cbdata) > +{ > + char *data = (char*)cbdata; > + free(data); > +} > + > + > +pmix_status_t pmix_server_get(pmix_buffer_t *buf, > + pmix_modex_cbfunc_t cbfunc, > + void *cbdata) > +{ > + int32_t cnt; > + pmix_status_t rc; > + int rank; > + char *cptr; > + char nspace[PMIX_MAX_NSLEN+1]; > + pmix_nspace_t *ns, *nptr; > + pmix_info_t *info=NULL; > + size_t ninfo=0; > + pmix_dmdx_local_t *lcd, *cd; > + pmix_rank_info_t *iptr; > + pmix_hash_table_t *ht; > + bool local; > + > + pmix_output_verbose(2, pmix_globals.debug_output, > + "recvd GET"); > + > + /* setup */ > + memset(nspace, 0, sizeof(nspace)); > + > + /* retrieve the nspace and rank of the requested proc */ > + cnt = 1; > + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, > PMIX_STRING))) { > + PMIX_ERROR_LOG(rc); > + return rc; > + } > + (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); > + free(cptr); > + cnt = 1; > + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, > PMIX_INT))) { > + PMIX_ERROR_LOG(rc); > + return rc; > + } > + /* retrieve any provided info structs */ > + cnt = 1; > + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, > PMIX_SIZE))) { > + PMIX_ERROR_LOG(rc); > + return rc; > + } > + if (0 < ninfo) { > + PMIX_INFO_CREATE(info, ninfo); > + cnt = ninfo; > + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, > PMIX_INFO))) { > + PMIX_ERROR_LOG(rc); > + PMIX_INFO_FREE(info, ninfo); > + return rc; > + } > + } > + > + /* find the nspace object for this client */ > + nptr = NULL; > + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { > + if (0 == strcmp(nspace, ns->nspace)) { > + nptr = ns; > + break; > + } > + } > + > + pmix_output_verbose(2, pmix_globals.debug_output, > + "%s:%d EXECUTE GET FOR %s:%d", > + pmix_globals.myid.nspace, > + pmix_globals.myid.rank, nspace, rank); > + > + if (NULL == nptr || NULL == nptr->server) { > + /* this is for an nspace we don't know about yet, so > + * record the request for data from this process and > + * give the host server a chance to tell us about it */ > + rc = create_local_tracker(nspace, rank, info, ninfo, > + cbfunc, cbdata, &lcd); > + return rc; > + } > + > + /* We have to wait for all local clients to be registered before > + * we can know whether this request is for data from a local or a > + * remote client because one client might ask for data about another > + * client that the host RM hasn't told us about yet. Fortunately, > + * we do know how many clients to expect, so first check to see if > + * all clients have been registered with us */ > + if (!nptr->server->all_registered) { > + /* we cannot do anything further, so just track this request > + * for now */ > + rc = create_local_tracker(nspace, rank, info, ninfo, > + cbfunc, cbdata, &lcd); > + return rc; > + } > + > + /* Since we know about all the local clients in this nspace, > + * let's first try to satisfy the request with any available data. > + * By default, we assume we are looking for data from a remote > + * client, and then check to see if this is one of my local > + * clients - if so, then we look in that hash table */ > + ht = &nptr->server->remote; > + local = false; > + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { > + if (iptr->rank == rank) { > + /* it is known local client - check the local table */ > + ht = &nptr->server->mylocal; > + local = true; > + break; > + } > + } > + > + /* see if we already have this data */ > + rc = _satisfy_request(ht, rank, cbfunc, cbdata); > + if( PMIX_SUCCESS == rc ){ > + /* request was successfully satisfied */ > + PMIX_INFO_FREE(info, ninfo); > + return rc; > + } > + > + /* If we get here, then we don't have the data at this time. Check > + * to see if we already have a pending request for the data - if > + * we do, then we can just wait for it to arrive */ > + rc = create_local_tracker(nspace, rank, info, ninfo, > + cbfunc, cbdata, &lcd); > + if (PMIX_SUCCESS == rc) { > + /* we are already waiting for the data - nothing more > + * for us to do as the function added the new request > + * to the tracker for us */ > + return PMIX_SUCCESS; > + } > + if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) { > + /* we have a problem - e.g., out of memory */ > + return rc; > + } > + > + /* Getting here means that we didn't already have a request for > + * for data pending, and so we created a new tracker for this > + * request. We know the identity of all our local clients, so > + * if this is one, then we have nothing further to do - we will > + * fulfill the request once the process commits its data */ > + if (local) { > + return PMIX_SUCCESS; > + } > + > + /* this isn't a local client of ours, so we need to ask the host > + * resource manager server to please get the info for us from > + * whomever is hosting the target process */ > + if (NULL != pmix_host_server.direct_modex) { > + rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, > dmdx_cbfunc, lcd); > + } else { > + /* if we don't have direct modex feature, just respond with "not > found" */ > + cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); > + PMIX_INFO_FREE(info, ninfo); > + pmix_list_remove_item(&pmix_server_globals.local_reqs, > &lcd->super); > + PMIX_LIST_DESTRUCT(&lcd->loc_reqs); > + PMIX_RELEASE(lcd); > + rc = PMIX_ERR_NOT_FOUND; > + } > + > + return rc; > +} > + > +static pmix_status_t create_local_tracker(char nspace[], int rank, > + pmix_info_t info[], size_t > ninfo, > + pmix_modex_cbfunc_t cbfunc, > + void *cbdata, > + pmix_dmdx_local_t **ld) > +{ > + pmix_dmdx_local_t *lcd, *cd; > + pmix_dmdx_request_t *req; > + pmix_status_t rc; > + > + /* define default */ > + *ld = NULL; > + > + /* see if we already have an existing request for data > + * from this namespace/rank */ > + lcd = NULL; > + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > + if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || > + rank != cd->proc.rank ) { > + continue; > + } > + lcd = cd; > + break; > + } > + if (NULL != lcd) { > + /* we already have a request, so just track that someone > + * else wants data from the same target */ > + rc = PMIX_SUCCESS; // indicates we found an existing request > + goto complete; > + } > + /* we do not have an existing request, so let's create > + * one and add it to our list */ > + lcd = PMIX_NEW(pmix_dmdx_local_t); > + if (NULL == lcd){ > + PMIX_INFO_FREE(info, ninfo); > + return PMIX_ERR_NOMEM; > + } > + strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN); > + lcd->proc.rank = rank; > + lcd->info = info; > + lcd->ninfo = ninfo; > + pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); > + rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new request > tracker > + > + complete: > + /* track this specific requestor so we return the > + * data to them */ > + req = PMIX_NEW(pmix_dmdx_request_t); > + req->cbfunc = cbfunc; > + req->cbdata = cbdata; > + pmix_list_append(&lcd->loc_reqs, &req->super); > + *ld = lcd; > + return rc; > +} > + > +void pmix_pending_nspace_requests(pmix_nspace_t *nptr) > +{ > + pmix_dmdx_local_t *cd, *cd_next; > + > + /* Now that we know all local ranks, go along request list and ask > for remote data > + * for the non-local ranks, and resolve all pending requests for > local procs > + * that were waiting for registration to complete > + */ > + PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > + pmix_rank_info_t *info; > + bool found = false; > + > + if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) > { > + continue; > + } > + > + PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { > + if (info->rank == cd->proc.rank) { > + found = true; // we will satisy this request upon commit > from new proc > + break; > + } > + } > + > + /* if not found - this is remote process and we need to send > + * corresponding direct modex request */ > + if( !found ){ > + if( NULL != pmix_host_server.direct_modex ){ > + pmix_host_server.direct_modex(&cd->proc, cd->info, > cd->ninfo, dmdx_cbfunc, cd); > + } else { > + pmix_dmdx_request_t *req, *req_next; > + PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, > pmix_dmdx_request_t) { > + req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, > NULL, NULL); > + pmix_list_remove_item(&cd->loc_reqs, &req->super); > + PMIX_RELEASE(req); > + } > + pmix_list_remove_item(&pmix_server_globals.local_reqs, > &cd->super); > + PMIX_RELEASE(cd); > + } > + } > + } > +} > + > +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, > + pmix_modex_cbfunc_t cbfunc, void > *cbdata) > +{ > + pmix_status_t rc; > + pmix_value_t *val; > + char *data; > + size_t sz; > + pmix_buffer_t xfer, pbkt, *xptr; > + > + /* check to see if this data already has been > + * obtained as a result of a prior direct modex request from > + * a remote peer, or due to data from a local client > + * having been committed */ > + rc = pmix_hash_fetch(ht, rank, "modex", &val); > + if (PMIX_SUCCESS == rc && NULL != val) { > + /* the client is expecting this to arrive as a byte object > + * containing a buffer, so package it accordingly */ > + PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); > + PMIX_CONSTRUCT(&xfer, pmix_buffer_t); > + xptr = &xfer; > + PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); > + pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER); > + xfer.base_ptr = NULL; // protect the passed data > + xfer.bytes_used = 0; > + PMIX_DESTRUCT(&xfer); > + PMIX_UNLOAD_BUFFER(&pbkt, data, sz); > + PMIX_DESTRUCT(&pbkt); > + PMIX_VALUE_RELEASE(val); > + /* pass it back */ > + cbfunc(rc, data, sz, cbdata, relfn, data); > + return rc; > + } > + return PMIX_ERR_NOT_FOUND; > +} > + > +/* Resolve pending requests to this namespace/rank */ > +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, > + pmix_status_t status, > pmix_dmdx_local_t *lcd) > +{ > + pmix_dmdx_local_t *cd; > + > + /* find corresponding request (if exists) */ > + if (NULL == lcd && NULL != nptr) { > + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > + if (0 != strncmp(nptr->nspace, cd->proc.nspace, > PMIX_MAX_NSLEN) || > + rank != cd->proc.rank) { > + continue; > + } > + lcd = cd; > + break; > + } > + } > + > + /* If somebody was interested in this rank */ > + if (NULL != lcd) { > + pmix_dmdx_request_t *req; > + > + if (PMIX_SUCCESS != status){ > + /* if we've got an error for this request - just forward it*/ > + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { > + /* if we can't satisfy this request - respond with error > */ > + req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); > + } > + } else if (NULL != nptr) { > + /* if we've got the blob - try to satisfy requests */ > + pmix_hash_table_t *ht; > + pmix_rank_info_t *iptr; > + > + /* by default we are looking for the remote data */ > + ht = &nptr->server->remote; > + /* check if this rank is local */ > + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, > pmix_rank_info_t) { > + if (iptr->rank == rank) { > + ht = &nptr->server->mylocal; > + break; > + } > + } > + > + /* run through all the requests to this rank */ > + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { > + pmix_status_t rc; > + rc = _satisfy_request(ht, rank, req->cbfunc, req->cbdata); > + if( PMIX_SUCCESS != rc ){ > + /* if we can't satisfy this particular request > (missing key?) */ > + req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); > + } > + } > + } > + /* remove all requests to this rank and cleanup the corresponding > structure */ > + pmix_list_remove_item(&pmix_server_globals.local_reqs, > (pmix_list_item_t*)lcd); > + PMIX_RELEASE(lcd); > + } > + return PMIX_SUCCESS; > +} > + > +/* process the returned data from the host RM server */ > +static void _process_dmdx_reply(int fd, short args, void *cbdata) > +{ > + pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; > + pmix_kval_t *kp; > + pmix_nspace_t *ns, *nptr; > + pmix_status_t rc; > + pmix_buffer_t xfer, pbkt, *xptr; > + > + pmix_output_verbose(2, pmix_globals.debug_output, > + "[%s:%d] process dmdx reply from %s:%d", > + __FILE__, __LINE__, > + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); > + > + /* find the nspace object for this client */ > + nptr = NULL; > + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { > + if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { > + nptr = ns; > + break; > + } > + } > + > + if (NULL == nptr) { > + /* should be impossible */ > + PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); > + caddy->status = PMIX_ERR_NOT_FOUND; > + goto cleanup; > + } > + > + /* if the request was successfully satisfied, then store the data > + * in our hash table for remote procs. Although we could immediately > + * resolve any outstanding requests on our tracking list, we instead > + * store the data first so we can immediately satisfy any future > + * requests. Then, rather than duplicate the resolve code here, we > + * will let the pmix_pending_resolve function go ahead and retrieve > + * it from the hash table */ > + if (PMIX_SUCCESS == caddy->status) { > + kp = PMIX_NEW(pmix_kval_t); > + kp->key = strdup("modex"); > + PMIX_VALUE_CREATE(kp->value, 1); > + kp->value->type = PMIX_BYTE_OBJECT; > + /* we don't know if the host is going to save this data > + * or not, so we have to copy it - the client is expecting > + * this to arrive as a byte object containing a buffer, so > + * package it accordingly */ > + kp->value->data.bo.bytes = malloc(caddy->ndata); > + memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); > + kp->value->data.bo.size = caddy->ndata; > + /* store it in the appropriate hash */ > + if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, > caddy->lcd->proc.rank, kp))) { > + PMIX_ERROR_LOG(rc); > + } > + PMIX_RELEASE(kp); // maintain acctg > + } > + > + /* always execute the callback to avoid having the client hang */ > + pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, > caddy->lcd); > + > +cleanup: > + /* now call the release function so the host server > + * knows it can release the data */ > + if (NULL != caddy->relcbfunc) { > + caddy->relcbfunc(caddy->cbdata); > + } > + PMIX_RELEASE(caddy); > +} > + > +/* this is the callback function that the host RM server will call > + * when it gets requested info back from a remote server */ > +static void dmdx_cbfunc(pmix_status_t status, > + const char *data, size_t ndata, void *cbdata, > + pmix_release_cbfunc_t release_fn, void > *release_cbdata) > +{ > + pmix_dmdx_reply_caddy_t *caddy; > + > + /* because the host RM is calling us from their own thread, we > + * need to thread-shift into our local progress thread before > + * accessing any global info */ > + caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); > + caddy->status = status; > + /* point to the callers cbfunc */ > + caddy->relcbfunc = release_fn; > + caddy->cbdata = release_cbdata; > + > + /* point to the returned data and our own internal > + * tracker */ > + caddy->data = data; > + caddy->ndata = ndata; > + caddy->lcd = (pmix_dmdx_local_t *)cbdata; > + pmix_output_verbose(2, pmix_globals.debug_output, > + "[%s:%d] queue dmdx reply for %s:%d", > + __FILE__, __LINE__, > + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); > + event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, > + _process_dmdx_reply, caddy); > + event_priority_set(&caddy->ev, 0); > + event_active(&caddy->ev, EV_WRITE, 1); > +} > + > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c > b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c > index 4a4abd1..43d35b5 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c > +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c > @@ -58,246 +58,6 @@ > > pmix_server_module_t pmix_host_server = {0}; > > -static void dmdx_cbfunc(pmix_status_t status, const char *data, > - size_t ndata, void *cbdata, > - pmix_release_cbfunc_t relfn, void *relcbdata); > - > -typedef struct { > - pmix_object_t super; > - pmix_event_t ev; > - pmix_status_t status; > - const char *data; > - size_t ndata; > - pmix_dmdx_local_t *lcd; > - pmix_release_cbfunc_t relcbfunc; > - void *cbdata; > -} pmix_dmdx_reply_caddy_t; > -PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, > - pmix_object_t, NULL, NULL); > - > - > -static void relfn(void *cbdata) > -{ > - char *data = (char*)cbdata; > - free(data); > -} > - > -static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank, > - pmix_hash_table_t *ht, > - pmix_modex_cbfunc_t cbfunc, void > *cbdata) > -{ > - pmix_status_t rc; > - pmix_buffer_t pbkt, xfer; > - pmix_value_t *val; > - char *data; > - size_t sz; > - > - /* check to see if this data already has been > - * obtained as a result of a prior direct modex request from > - * another local peer */ > - rc = pmix_hash_fetch(ht, rank, "modex", &val); > - if (PMIX_SUCCESS == rc && NULL != val) { > - PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); > - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); > - pmix_buffer_t *pxfer = &xfer; > - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); > - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); > - xfer.base_ptr = NULL; > - xfer.bytes_used = 0; > - PMIX_DESTRUCT(&xfer); > - PMIX_VALUE_RELEASE(val); > - PMIX_UNLOAD_BUFFER(&pbkt, data, sz); > - PMIX_DESTRUCT(&pbkt); > - /* pass it back */ > - cbfunc(rc, data, sz, cbdata, relfn, data); > - return rc; > - } > - return PMIX_ERR_NOT_FOUND; > -} > - > -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, > - pmix_info_t *info, size_t ninfo, > - pmix_modex_cbfunc_t cbfunc, void > *cbdata) > -{ > - pmix_dmdx_local_t *lcd = NULL, *cd; > - pmix_rank_info_t *iptr; > - pmix_hash_table_t *ht; > - pmix_status_t rc; > - > - /* 1. Try to satisfy the request right now */ > - > - /* by default we are looking for the remote data */ > - ht = &nptr->server->remote; > - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { > - if (iptr->rank == rank) { > - /* in case it is known local rank - check local table */ > - ht = &nptr->server->mylocal; > - break; > - } > - } > - > - rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata); > - if( PMIX_SUCCESS == rc ){ > - /* request was successfully satisfied */ > - PMIX_INFO_FREE(info, ninfo); > - return rc; > - } > - > - /* 2. We were unable to satisfy request right now. > - * Look for existing requests to this namespace/rank */ > - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || > - rank != cd->proc.rank ) { > - continue; > - } > - lcd = cd; > - break; > - } > - > - /* 3. If no requests exists then: > - * - if all local clients are registered then we were called because > - * the remote data was requested. Create request and call direct modex > - * to retrieve the data > - * - if not all local ranks were registered, we need to wait untill > - * pmix_pending_localy_fin would be called to resolve this. Just add > the > - * request for now. > - */ > - if (NULL == lcd) { > - lcd = PMIX_NEW(pmix_dmdx_local_t); > - if (NULL == lcd){ > - PMIX_INFO_FREE(info, ninfo); > - return PMIX_ERR_NOMEM; > - } > - strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN); > - lcd->proc.rank = rank; > - lcd->info = info; > - lcd->ninfo = ninfo; > - pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); > - > - /* check & send request if need/possible */ > - if (nptr->server->all_registered && NULL == info) { > - if (NULL != pmix_host_server.direct_modex) { > - pmix_host_server.direct_modex(&lcd->proc, info, ninfo, > dmdx_cbfunc, lcd); > - } else { > - /* if we don't have direct modex feature, just respond > with "not found" */ > - cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); > - PMIX_INFO_FREE(info, ninfo); > - pmix_list_remove_item(&pmix_server_globals.local_reqs, > &lcd->super); > - PMIX_LIST_DESTRUCT(&lcd->loc_reqs); > - PMIX_RELEASE(lcd); > - return PMIX_SUCCESS; > - } > - } > - } > - pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t); > - req->cbfunc = cbfunc; > - req->cbdata = cbdata; > - pmix_list_append(&lcd->loc_reqs, &req->super); > - return PMIX_SUCCESS; > -} > - > -void pmix_pending_nspace_fix(pmix_nspace_t *nptr) > -{ > - pmix_dmdx_local_t *cd, *cd_next; > - > - /* Now when we know all local ranks, go along request list and ask > for remote data > - * for the non-local ranks, and resolve all pending requests for > local procs > - * that were waiting for registration to complete > - */ > - PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > - pmix_rank_info_t *info; > - bool found = false; > - > - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) ) > { > - continue; > - } > - > - PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { > - if (info->rank == cd->proc.rank) { > - found = true; > - break; > - } > - } > - > - /* if not found - this is remote process and we need to send > - * corresponding direct modex request */ > - if( !found ){ > - if( NULL != pmix_host_server.direct_modex ){ > - pmix_host_server.direct_modex(&cd->proc, cd->info, > cd->ninfo, dmdx_cbfunc, cd); > - } else { > - pmix_dmdx_request_t *req, *req_next; > - PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, > pmix_dmdx_request_t) { > - req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata, > NULL, NULL); > - pmix_list_remove_item(&cd->loc_reqs, &req->super); > - PMIX_RELEASE(req); > - } > - pmix_list_remove_item(&pmix_server_globals.local_reqs, > &cd->super); > - PMIX_RELEASE(cd); > - } > - } > - } > -} > - > -/* Resolve pending requests to this namespace/rank */ > -pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, > - pmix_status_t status, > pmix_dmdx_local_t *lcd) > -{ > - pmix_dmdx_local_t *cd; > - > - /* find corresponding request (if exists) */ > - if( NULL == lcd ){ > - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, > pmix_dmdx_local_t) { > - if (0 != strncmp(nptr->nspace, cd->proc.nspace, > PMIX_MAX_NSLEN) || > - rank != cd->proc.rank) { > - continue; > - } > - lcd = cd; > - break; > - } > - } > - > - /* If somebody was interested in this rank */ > - if( NULL != lcd ){ > - pmix_dmdx_request_t *req; > - > - if (PMIX_SUCCESS != status){ > - /* if we've got an error for this request - just forward it*/ > - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { > - /* if we can't satisfy this request - respond with error > */ > - req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); > - } > - } else { > - /* if we've got the blob - try to satisfy requests */ > - pmix_hash_table_t *ht; > - pmix_rank_info_t *iptr; > - > - /* by default we are looking for the remote data */ > - ht = &nptr->server->remote; > - /* check if this rank is local */ > - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, > pmix_rank_info_t) { > - if (iptr->rank == rank) { > - ht = &nptr->server->mylocal; > - break; > - } > - } > - > - /* run through all the requests to this rank */ > - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { > - pmix_status_t rc; > - rc = _satisfy_request(nptr, rank, ht, req->cbfunc, > req->cbdata); > - if( PMIX_SUCCESS != rc ){ > - /* if we can't satisfy this particular request > (missing key?) */ > - req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); > - } > - } > - } > - /* remove all requests to this rank and cleanup the corresponding > structure */ > - pmix_list_remove_item(&pmix_server_globals.local_reqs, > (pmix_list_item_t*)lcd); > - PMIX_RELEASE(lcd); > - } > - return PMIX_SUCCESS; > -} > - > pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf, > pmix_op_cbfunc_t cbfunc, void *cbdata) > { > @@ -436,13 +196,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, > pmix_buffer_t *buf) > * may not be a contribution */ > if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, > info->rank, "modex", &val) && > NULL != val) { > - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); > - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, > val->data.bo.size); > - pmix_buffer_t *pxfer = &xfer; > - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); > - xfer.base_ptr = NULL; > - xfer.bytes_used = 0; > - PMIX_DESTRUCT(&xfer); > + PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes, > val->data.bo.size); > PMIX_VALUE_RELEASE(val); > } > PMIX_UNLOAD_BUFFER(&pbkt, data, sz); > @@ -457,7 +211,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, > pmix_buffer_t *buf) > PMIX_RELEASE(dcd); > } > } > - /* see if anyone local is waiting on this data- could be more than > one */ > + /* see if anyone local is waiting on this data - could be more than > one */ > return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL); > } > > @@ -826,163 +580,6 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t > *cd, > return rc; > } > > -static void _process_dmdx_reply(int fd, short args, void *cbdata) > -{ > - pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; > - pmix_kval_t *kp; > - pmix_nspace_t *ns, *nptr; > - pmix_status_t rc; > - > - pmix_output_verbose(2, pmix_globals.debug_output, > - "[%s:%d] queue dmdx reply from %s:%d", > - __FILE__, __LINE__, > - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); > - > - /* find the nspace object for this client */ > - nptr = NULL; > - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { > - if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { > - nptr = ns; > - break; > - } > - } > - > - if (NULL == nptr) { > - /* should be impossible */ > - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); > - caddy->status = PMIX_ERR_NOT_FOUND; > - goto cleanup; > - } > - > - if (PMIX_SUCCESS == caddy->status) { > - kp = PMIX_NEW(pmix_kval_t); > - kp->key = strdup("modex"); > - PMIX_VALUE_CREATE(kp->value, 1); > - kp->value->type = PMIX_BYTE_OBJECT; > - /* we don't know if the host is going to save this data > - * or not, so we have to copy it */ > - kp->value->data.bo.bytes = (char*)malloc(caddy->ndata); > - memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); > - kp->value->data.bo.size = caddy->ndata; > - /* store it in the appropriate hash */ > - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, > caddy->lcd->proc.rank, kp))) { > - PMIX_ERROR_LOG(rc); > - } > - PMIX_RELEASE(kp); // maintain acctg > - } > - > -cleanup: > - /* always execute the callback to avoid having the client hang */ > - pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, > caddy->lcd); > - > - /* now call the release function so the host server > - * knows it can release the data */ > - if (NULL != caddy->relcbfunc) { > - caddy->relcbfunc(caddy->cbdata); > - } > - PMIX_RELEASE(caddy); > -} > - > -static void dmdx_cbfunc(pmix_status_t status, > - const char *data, size_t ndata, void *cbdata, > - pmix_release_cbfunc_t release_fn, void > *release_cbdata) > -{ > - pmix_dmdx_reply_caddy_t *caddy; > - caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); > - caddy->status = status; > - /* point to the callers cbfunc */ > - caddy->relcbfunc = release_fn; > - caddy->cbdata = release_cbdata; > - > - caddy->data = data; > - caddy->ndata = ndata; > - caddy->lcd = (pmix_dmdx_local_t *)cbdata; > - pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] queue dmdx > reply %s:%d", > - __FILE__, __LINE__, > - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); > - event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, > - _process_dmdx_reply, caddy); > - event_priority_set(&caddy->ev, 0); > - event_active(&caddy->ev, EV_WRITE, 1); > -} > - > -pmix_status_t pmix_server_get(pmix_buffer_t *buf, > - pmix_modex_cbfunc_t cbfunc, > - void *cbdata) > -{ > - int32_t cnt; > - pmix_status_t rc; > - int rank; > - char *cptr; > - char nspace[PMIX_MAX_NSLEN+1]; > - pmix_nspace_t *ns, *nptr; > - pmix_info_t *info=NULL; > - size_t ninfo=0; > - > - pmix_output_verbose(2, pmix_globals.debug_output, > - "recvd GET"); > - > - /* setup */ > - memset(nspace, 0, sizeof(nspace)); > - > - /* retrieve the nspace and rank of the requested proc */ > - cnt = 1; > - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, > PMIX_STRING))) { > - PMIX_ERROR_LOG(rc); > - return rc; > - } > - (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); > - free(cptr); > - cnt = 1; > - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, > PMIX_INT))) { > - PMIX_ERROR_LOG(rc); > - return rc; > - } > - /* find the nspace object for this client */ > - nptr = NULL; > - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { > - if (0 == strcmp(nspace, ns->nspace)) { > - nptr = ns; > - break; > - } > - } > - > - pmix_output_verbose(2, pmix_globals.debug_output, > - "%s:%d EXECUTE GET FOR %s:%d", > - pmix_globals.myid.nspace, > - pmix_globals.myid.rank, nspace, rank); > - > - /* retrieve any provided info structs */ > - cnt = 1; > - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, > PMIX_SIZE))) { > - PMIX_ERROR_LOG(rc); > - return rc; > - } > - if (0 < ninfo) { > - PMIX_INFO_CREATE(info, ninfo); > - cnt = ninfo; > - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, > PMIX_INFO))) { > - PMIX_ERROR_LOG(rc); > - PMIX_INFO_FREE(info, ninfo); > - return rc; > - } > - } > - > - if (NULL == nptr) { > - /* this is for an nspace we don't know about yet, so > - * give the host server a chance to tell us about it */ > - nptr = PMIX_NEW(pmix_nspace_t); > - (void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN); > - pmix_list_append(&pmix_globals.nspaces, &nptr->super); > - } > - /* if we don't have any ranks for this job, protect ourselves here */ > - if (NULL == nptr->server) { > - nptr->server = PMIX_NEW(pmix_server_nspace_t); > - } > - > - return pmix_pending_request(nptr, rank, info, ninfo, cbfunc, cbdata); > -} > - > pmix_status_t pmix_server_publish(pmix_peer_t *peer, > pmix_buffer_t *buf, > pmix_op_cbfunc_t cbfunc, void *cbdata) > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h > b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h > index c6279d5..9129b6b 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h > +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h > @@ -183,10 +183,7 @@ void pmix_stop_listening(void); > > bool pmix_server_trk_update(pmix_server_trkr_t *trk); > > -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, > - pmix_info_t *info, size_t ninfo, > - pmix_modex_cbfunc_t cbfunc, void > *cbdata); > -void pmix_pending_nspace_fix(pmix_nspace_t *nptr); > +void pmix_pending_nspace_requests(pmix_nspace_t *nptr); > pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, > pmix_status_t status, > pmix_dmdx_local_t *lcd); > > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c > b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c > index 8cc4bcd..90c42ed 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c > +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c > @@ -123,6 +123,8 @@ const char* PMIx_Error_string(pmix_status_t errnum) > case PMIX_EXISTS: > return "EXISTS"; > > + case PMIX_ERR_SILENT: > + return "SILENT"; > case PMIX_ERROR: > return "ERROR"; > case PMIX_SUCCESS: > diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h > b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h > index f72227a..e43ac47 100644 > --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h > +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h > @@ -28,9 +28,13 @@ > > BEGIN_C_DECLS > > -#define PMIX_ERROR_LOG(r) \ > - pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ > - PMIx_Error_string((r)), __FILE__, __LINE__); > +#define PMIX_ERROR_LOG(r) \ > + do { \ > + if (PMIX_ERR_SILENT != (r)) { \ > + pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ > + PMIx_Error_string((r)), __FILE__, __LINE__); \ > + } \ > + }while(0); > > #define PMIX_REPORT_ERROR(e) \ > do { \ > diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c > b/opal/mca/pmix/pmix1xx/pmix1_client.c > index f1ba0d5..e9c50b7 100644 > --- a/opal/mca/pmix/pmix1xx/pmix1_client.c > +++ b/opal/mca/pmix/pmix1xx/pmix1_client.c > @@ -217,6 +217,7 @@ int pmix1_store_local(const opal_process_name_t *proc, > opal_value_t *val) > } > } > if (NULL == job) { > + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); > return OPAL_ERR_NOT_FOUND; > } > (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); > diff --git a/opal/mca/pmix/pmix1xx/pmix1_server_south.c > b/opal/mca/pmix/pmix1xx/pmix1_server_south.c > index ae42de0..f0d0f11 100644 > --- a/opal/mca/pmix/pmix1xx/pmix1_server_south.c > +++ b/opal/mca/pmix/pmix1xx/pmix1_server_south.c > @@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t status, void > *cbdata) > } > > int pmix1_server_register_nspace(opal_jobid_t jobid, > - int nlocalprocs, > - opal_list_t *info, > - opal_pmix_op_cbfunc_t cbfunc, > - void *cbdata) > + int nlocalprocs, > + opal_list_t *info, > + opal_pmix_op_cbfunc_t cbfunc, > + void *cbdata) > { > opal_value_t *kv, *k2; > pmix_info_t *pinfo, *pmap; > @@ -168,10 +168,17 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, > pmix_status_t rc; > pmix1_opcaddy_t *op; > opal_list_t *pmapinfo; > + opal_pmix1_jobid_trkr_t *job; > > /* convert the jobid */ > (void)snprintf(nspace, PMIX_MAX_NSLEN, > opal_convert_jobid_to_string(jobid)); > > + /* store this job in our list of known nspaces */ > + job = OBJ_NEW(opal_pmix1_jobid_trkr_t); > + (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN); > + job->jobid = jobid; > + opal_list_append(&mca_pmix_pmix1xx_component.jobids, &job->super); > + > /* convert the list to an array of pmix_info_t */ > if (NULL != info) { > sz = opal_list_get_size(info); > @@ -220,10 +227,10 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, > > > int pmix1_server_register_client(const opal_process_name_t *proc, > - uid_t uid, gid_t gid, > - void *server_object, > - opal_pmix_op_cbfunc_t cbfunc, > - void *cbdata) > + uid_t uid, gid_t gid, > + void *server_object, > + opal_pmix_op_cbfunc_t cbfunc, > + void *cbdata) > { > pmix_status_t rc; > pmix1_opcaddy_t *op; > @@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t status, char > *data, size_t sz, void *cbd > } > > int pmix1_server_dmodex(const opal_process_name_t *proc, > - opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) > + opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) > { > pmix1_opcaddy_t *op; > pmix_status_t rc; > diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c > index 953145d..ee5582c 100644 > --- a/orte/orted/pmix/pmix_server.c > +++ b/orte/orted/pmix/pmix_server.c > @@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int status, > orte_process_name_t* sender, > int rc, ret, room_num; > int32_t cnt; > opal_process_name_t target; > - opal_value_t kv; > pmix_server_req_t *req; > uint8_t *data = NULL; > int32_t ndata = 0; > @@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int status, > orte_process_name_t* sender, > return; > } > > - /* if we got something, store the blobs locally so we can > - * meet any further requests without doing a remote fetch. > - * This must be done as a single blob for later retrieval */ > - if (ORTE_SUCCESS == ret && NULL != data) { > - OBJ_CONSTRUCT(&kv, opal_value_t); > - kv.key = strdup("modex"); > - kv.type = OPAL_BYTE_OBJECT; > - kv.data.bo.bytes = data; > - kv.data.bo.size = ndata; > - if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&target, &kv))) { > - ORTE_ERROR_LOG(rc); > - } > - kv.data.bo.bytes = NULL; // protect the data > - kv.data.bo.size = 0; > - OBJ_DESTRUCT(&kv); > - } > - > /* check the request out of the tracking hotel */ > > opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, > room_num, (void**)&req); > /* return the returned data to the requestor */ > if (NULL != req) { > if (NULL != req->mdxcbfunc) { > req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, > relcbfunc, data); > + } else { > + free(data); > } > OBJ_RELEASE(req); > } > diff --git a/orte/orted/pmix/pmix_server_fence.c > b/orte/orted/pmix/pmix_server_fence.c > index b3b0e33..765c1c2 100644 > --- a/orte/orted/pmix/pmix_server_fence.c > +++ b/orte/orted/pmix/pmix_server_fence.c > @@ -197,6 +197,12 @@ static void dmodex_req(int sd, short args, void > *cbdata) > goto callback; > } > > + /* if we are the host daemon, then this is a local request, so > + * just wait for the data to come in */ > + if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) { > + return; > + } > + > /* construct a request message */ > buf = OBJ_NEW(opal_buffer_t); > if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, > OPAL_NAME))) { > > > ----------------------------------------------------------------------- > > Summary of changes: > ompi/runtime/ompi_mpi_init.c | 7 +- > opal/mca/pmix/base/pmix_base_frame.c | 11 +- > opal/mca/pmix/pmix.h | 13 +- > .../pmix1xx/pmix/include/pmix/pmix_common.h.in | 3 +- > .../pmix/pmix1xx/pmix/src/client/pmix_client_get.c | 1 + > opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am | 3 +- > .../mca/pmix/pmix1xx/pmix/src/server/pmix_server.c | 11 +- > .../pmix/pmix1xx/pmix/src/server/pmix_server_get.c | 552 > +++++++++++++++++++++ > .../pmix/pmix1xx/pmix/src/server/pmix_server_ops.c | 407 +-------------- > .../pmix/pmix1xx/pmix/src/server/pmix_server_ops.h | 5 +- > opal/mca/pmix/pmix1xx/pmix/src/util/error.c | 2 + > opal/mca/pmix/pmix1xx/pmix/src/util/error.h | 10 +- > opal/mca/pmix/pmix1xx/pmix1_client.c | 1 + > opal/mca/pmix/pmix1xx/pmix1_server_south.c | 25 +- > orte/orted/pmix/pmix_server.c | 20 +- > orte/orted/pmix/pmix_server_fence.c | 6 + > 16 files changed, 620 insertions(+), 457 deletions(-) > create mode 100644 opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c > > > hooks/post-receive > -- > open-mpi/ompi > _______________________________________________ > ompi-commits mailing list > ompi-comm...@open-mpi.org > http://www.open-mpi.org/mailman/listinfo.cgi/ompi-commits > > > > _______________________________________________ > devel mailing listde...@open-mpi.org > Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel > Link to this post: > http://www.open-mpi.org/community/lists/devel/2015/10/18297.php > > > > > _______________________________________________ > devel mailing list > de...@open-mpi.org > Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel > Link to this post: > http://www.open-mpi.org/community/lists/devel/2015/10/18299.php >