This is puzzling. I cannot reproduce either, not even with a fresh clone. Let's assume by now this was a false alert.
George. On Wed, Oct 28, 2015 at 2:01 AM, Gilles Gouaillardet <gil...@rist.or.jp> wrote: > George, > > as i wrote, i cannot reproduce the issue so i just had to guess. > my best guess is the wrong pmix_server.h is #include'd so pmix_common.h is > not even #include'd at all > > i checked the include path > cd opal/mca/pmix/pmix1xx/pmix && make clean && make -n > src/server/pmix_server_get.lo > if you can reproduce the issue without your patch, can you post the output > ? > > in my environment, all paths are correct and in the right order, so it > does not make any difference if > pmix_common.h or pmix/pmix_common.h is #include'd > > Cheers, > > Gilles > > > On 10/28/2015 2:22 PM, George Bosilca wrote: > > Gilles, > > Supposing that pmix_common.h has been already included, by adding it again > I should have obtained nothing new. I don't know which one is picked up, > but now there is at least one pmix_common.h to be included. > > If you look carrefully you will notice that the pmix_server.h > includes pmix/pmix_common.h and not pmix_common.h. If you want to figure > this one, that a good starting point. Btw, why do we have 3 headers with > the same name (it's soooo confusing) ? > > George. > > > On Wed, Oct 28, 2015 at 1:08 AM, Gilles Gouaillardet < <gil...@rist.or.jp> > gil...@rist.or.jp> wrote: > >> George, >> >> pmix_common.h is #include'd by pmix_server.h >> >> well ... >> pmix_common.h is #include'd by >> opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h >> and there are total 3 pmix_server.h >> find . -name pmix_server.h >> ./opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h >> ./opal/mca/pmix/pmix_server.h >> ./orte/orted/pmix/pmix_server.h >> >> >> i am using VPATH, i will give it an other try without VPATH >> >> so maybe the wrong pmix_server.h was #include'd ... and your fix hides >> the real issue, >> i will check this from now >> >> Cheers, >> >> Gilles >> >> On 10/28/2015 1:52 PM, George Bosilca wrote: >> >> Interesting, I wonder how your compiler gets to know the definition of >> the PMIX_ERR_SILENT without the pmix_common.h. I just pushed a fix. >> >> George. >> >> >> On Wed, Oct 28, 2015 at 12:43 AM, Gilles Gouaillardet < >> <gil...@rist.or.jp>gil...@rist.or.jp> wrote: >> >>> George, >>> >>> i am unable to reproduce the issue. >>> if build still breaks for you, could you send me your configure command >>> line ? >>> >>> Cheers, >>> >>> Gilles >>> >>> >>> On 10/28/2015 1:04 PM, Gilles Gouaillardet wrote: >>> >>> George, >>> >>> PMIX_ERR_SILENT is defined in opal/mca/pmix/pmix1xx/pmix/include/pmix/ >>> pmix_common.h.in >>> >>> i ll have a look at it from now >>> >>> Cheers, >>> >>> Gilles >>> >>> On 10/28/2015 12:02 PM, George Bosilca wrote: >>> >>> We get a nice compiler complaint: >>> >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: >>> In function 'pmix_server_get': >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >>> error: 'PMIX_ERR_SILENT' undeclared (first use in this function) >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >>> error: (Each undeclared identifier is reported only once >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >>> error: for each function it appears in.) >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:117: >>> warning: unused variable 'cd' >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: >>> In function '_process_dmdx_reply': >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:478: >>> error: 'PMIX_ERR_SILENT' undeclared (first use in this function) >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >>> warning: unused variable 'xptr' >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >>> warning: unused variable 'pbkt' >>> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >>> warning: unused variable 'xfer' >>> >>> And he is right: >>> >>> $find . -name "*.h" -exec grep -Hn PMIX_ERR_SILENT {} + >>> ./opal/mca/pmix/pmix1xx/pmix/src/util/error.h:33: if >>> (PMIX_ERR_SILENT != (r)) { \ >>> >>> George. >>> >>> ---------- Forwarded message ---------- >>> From: < <git...@crest.iu.edu>git...@crest.iu.edu> >>> Date: Tue, Oct 27, 2015 at 10:39 PM >>> Subject: [OMPI commits] Git: open-mpi/ompi branch master updated. >>> dev-2921-gb603307 >>> To: ompi-comm...@open-mpi.org >>> >>> >>> This is an automated email from the git hooks/post-receive script. It was >>> generated because a ref change was pushed to the repository containing >>> the project "open-mpi/ompi". >>> >>> The branch, master has been updated >>> via b603307f7d33663ef6fe5941bb0d94bd2be017cb (commit) >>> via 267ca8fcd3a59b780491d80d29e870061d8dac56 (commit) >>> from 3035e140511b082c51ad66e116dd381a083a191d (commit) >>> >>> Those revisions listed above that are new to this repository have >>> not appeared on any other notification email; so we list those >>> revisions in full, below. >>> >>> - Log ----------------------------------------------------------------- >>> >>> https://github.com/open-mpi/ompi/commit/b603307f7d33663ef6fe5941bb0d94bd2be017cb >>> >>> commit b603307f7d33663ef6fe5941bb0d94bd2be017cb >>> Merge: 3035e14 267ca8f >>> Author: rhc54 < <r...@open-mpi.org>r...@open-mpi.org> >>> Date: Tue Oct 27 19:39:10 2015 -0700 >>> >>> Merge pull request #1073 from rhc54/topic/pmix >>> >>> Cleanup the PMIx direct modex support. >>> >>> >>> >>> >>> https://github.com/open-mpi/ompi/commit/267ca8fcd3a59b780491d80d29e870061d8dac56 >>> >>> commit 267ca8fcd3a59b780491d80d29e870061d8dac56 >>> Author: Ralph Castain < <r...@open-mpi.org>r...@open-mpi.org> >>> Date: Tue Oct 27 11:01:49 2015 -0700 >>> >>> Cleanup the PMIx direct modex support. Add an MCA parameter >>> pmix_base_async_modex that will cause the async modex to be used when set >>> to 1. Default it to 0 for now >>> to continue current default behavior. >>> >>> Also add an MCA param pmix_base_collect_data to direct that the >>> blocking fence shall return all data to each process. Obviously, this param >>> has no effect if async_ >>> modex is used. >>> >>> diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c >>> index 4c0391d..d0eebb2 100644 >>> --- a/ompi/runtime/ompi_mpi_init.c >>> +++ b/ompi/runtime/ompi_mpi_init.c >>> @@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int >>> requested, int *provided) >>> >>> /* exchange connection info - this function may also act as a >>> barrier >>> * if data exchange is required. The modex occurs solely across >>> procs >>> - * in our job, so no proc array is passed. If a barrier is required, >>> - * the "modex" function will perform it internally >>> - */ >>> - OPAL_MODEX(NULL, 1); >>> + * in our job. If a barrier is required, the "modex" function will >>> + * perform it internally */ >>> + OPAL_MODEX(); >>> >>> OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier")); >>> >>> diff --git a/opal/mca/pmix/base/pmix_base_frame.c >>> b/opal/mca/pmix/base/pmix_base_frame.c >>> index e1ab766..6e8a347 100644 >>> --- a/opal/mca/pmix/base/pmix_base_frame.c >>> +++ b/opal/mca/pmix/base/pmix_base_frame.c >>> @@ -31,12 +31,21 @@ >>> /* Note that this initializer is important -- do not remove it! See >>> https://github.com/open-mpi/ompi/issues/375 for details. */ >>> opal_pmix_base_module_t opal_pmix = { 0 }; >>> -bool opal_pmix_collect_all_data = false; >>> +bool opal_pmix_collect_all_data = true; >>> bool opal_pmix_base_allow_delayed_server = false; >>> int opal_pmix_verbose_output = -1; >>> +bool opal_pmix_base_async_modex = false; >>> >>> static int opal_pmix_base_frame_register(mca_base_register_flag_t flags) >>> { >>> + opal_pmix_base_async_modex = false; >>> + (void) mca_base_var_register("opal", "pmix", "base", "async_modex", >>> "Use asynchronous modex mode", >>> + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, >>> OPAL_INFO_LVL_9, >>> + MCA_BASE_VAR_SCOPE_READONLY, >>> &opal_pmix_base_async_modex); >>> + opal_pmix_collect_all_data = true; >>> + (void) mca_base_var_register("opal", "pmix", "base", >>> "collect_data", "Collect all data during modex", >>> + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, >>> OPAL_INFO_LVL_9, >>> + MCA_BASE_VAR_SCOPE_READONLY, >>> &opal_pmix_collect_all_data); >>> return OPAL_SUCCESS; >>> } >>> >>> diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h >>> index f265e01..7223529 100644 >>> --- a/opal/mca/pmix/pmix.h >>> +++ b/opal/mca/pmix/pmix.h >>> @@ -36,6 +36,8 @@ BEGIN_C_DECLS >>> /* provide access to the framework verbose output without >>> * exposing the entire base */ >>> extern int opal_pmix_verbose_output; >>> +extern bool opal_pmix_collect_all_data; >>> +extern bool opal_pmix_base_async_modex; >>> extern int opal_pmix_base_exchange(opal_value_t *info, >>> opal_pmix_pdata_t *pdat, >>> int timeout); >>> @@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t >>> *info, >>> * that takes into account directives and availability of >>> * non-blocking operations >>> */ >>> -#define OPAL_MODEX(p, s) \ >>> - do { \ >>> - opal_pmix.commit(); \ >>> - opal_pmix.fence((p), (s)); \ >>> +#define OPAL_MODEX() \ >>> + do { \ >>> + opal_pmix.commit(); \ >>> + if (!opal_pmix_base_async_modex) { \ >>> + opal_pmix.fence(NULL, \ >>> + opal_pmix_collect_all_data); \ >>> + } \ >>> } while(0); >>> >>> /** >>> diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >>> b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >>> index 0216e34..5a111a1 100644 >>> --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >>> +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >>> @@ -183,7 +183,7 @@ BEGIN_C_DECLS >>> >>> /**** PMIX ERROR CONSTANTS ****/ >>> /* PMIx errors are always negative, with 0 reserved for success */ >>> -#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries >>> in enum >>> +#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries >>> in enum >>> >>> typedef enum { >>> PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN, >>> @@ -230,6 +230,7 @@ typedef enum { >>> PMIX_ERR_INVALID_CRED, >>> PMIX_EXISTS, >>> >>> + PMIX_ERR_SILENT, >>> PMIX_ERROR, >>> PMIX_SUCCESS >>> } pmix_status_t; >>> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >>> b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >>> index d41be9c..b93ca6d 100644 >>> --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >>> +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >>> @@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, >>> pmix_usock_hdr_t *hdr, >>> PMIX_RELEASE(bptr); // free's the data region >>> if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { >>> PMIX_ERROR_LOG(rc); >>> + rc = PMIX_ERR_SILENT; // avoid error-logging twice >>> break; >>> } >>> } >>> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >>> b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >>> index 5422b78..88b0468 100644 >>> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >>> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >>> @@ -16,4 +16,5 @@ sources += \ >>> src/server/pmix_server.c \ >>> src/server/pmix_server_ops.c \ >>> src/server/pmix_server_regex.c \ >>> - src/server/pmix_server_listener.c >>> + src/server/pmix_server_listener.c \ >>> + src/server/pmix_server_get.c >>> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >>> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >>> index d16ae16..85f9e17 100644 >>> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >>> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >>> @@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void >>> *cbdata) >>> pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata; >>> pmix_usock_send_t *snd; >>> pmix_output_verbose(2, pmix_globals.debug_output, >>> - "[%s:%d] queue callback called: reply to %s:%d >>> on tag %d," >>> - "event_is_active=%d", >>> + "[%s:%d] queue callback called: reply to %s:%d >>> on tag %d", >>> __FILE__, __LINE__, >>> (queue->peer)->info->nptr->nspace, >>> (queue->peer)->info->rank, (queue->tag), >>> @@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, >>> void *cbdata) >>> queue->buf = (b); >>> \ >>> queue->tag = (t); >>> \ >>> pmix_output_verbose(2, pmix_globals.debug_output, >>> \ >>> - "[%s:%d] queue reply to %s:%d on tag %d," >>> \ >>> - "event_is_active=%d", >>> \ >>> + "[%s:%d] queue reply to %s:%d on tag %d", >>> \ >>> __FILE__, __LINE__, >>> \ >>> (queue->peer)->info->nptr->nspace, >>> \ >>> - (queue->peer)->info->rank, (queue->tag), >>> \ >>> - (queue->peer)->send_ev_active); >>> \ >>> + (queue->peer)->info->rank, (queue->tag)); >>> \ >>> event_assign(&queue->ev, pmix_globals.evbase, -1, >>> \ >>> EV_WRITE, _queue_message, queue); >>> \ >>> event_priority_set(&queue->ev, 0); >>> \ >>> @@ -723,7 +720,7 @@ static void _register_client(int sd, short args, >>> void *cbdata) >>> * someone has been waiting for a request on a remote proc >>> * in one of our nspaces, but we didn't know all the local procs >>> * and so couldn't determine the proc was remote */ >>> - pmix_pending_nspace_fix(nptr); >>> + pmix_pending_nspace_requests(nptr); >>> } >>> /* let the caller know we are done */ >>> if (NULL != cd->opcbfunc) { >>> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >>> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >>> new file mode 100644 >>> index 0000000..2cb75cf >>> --- /dev/null >>> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >>> @@ -0,0 +1,552 @@ >>> +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ >>> +/* >>> + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. >>> + * Copyright (c) 2014-2015 Research Organization for Information Science >>> + * and Technology (RIST). All rights reserved. >>> + * Copyright (c) 2014-2015 Artem Y. Polyakov < <artpo...@gmail.com> >>> artpo...@gmail.com>. >>> + * All rights reserved. >>> + * Copyright (c) 2015 Mellanox Technologies, Inc. >>> + * All rights reserved. >>> + * $COPYRIGHT$ >>> + * >>> + * Additional copyrights may follow >>> + * >>> + * $HEADER$ >>> + */ >>> + >>> +#include <private/autogen/config.h> >>> +#include <pmix/rename.h> >>> +#include <private/types.h> >>> +#include <private/pmix_stdint.h> >>> +#include <private/pmix_socket_errno.h> >>> + >>> +#include <pmix_server.h> >>> +#include "src/include/pmix_globals.h" >>> + >>> +#ifdef HAVE_STRING_H >>> +#include <string.h> >>> +#endif >>> +#include <fcntl.h> >>> +#ifdef HAVE_UNISTD_H >>> +#include <unistd.h> >>> +#endif >>> +#ifdef HAVE_SYS_SOCKET_H >>> +#include <sys/socket.h> >>> +#endif >>> +#ifdef HAVE_SYS_UN_H >>> +#include <sys/un.h> >>> +#endif >>> +#ifdef HAVE_SYS_UIO_H >>> +#include <sys/uio.h> >>> +#endif >>> +#ifdef HAVE_SYS_TYPES_H >>> +#include <sys/types.h> >>> +#endif >>> +#include PMIX_EVENT_HEADER >>> + >>> +#include "src/class/pmix_list.h" >>> +#include "src/buffer_ops/buffer_ops.h" >>> +#include "src/util/argv.h" >>> +#include "src/util/error.h" >>> +#include "src/util/output.h" >>> +#include "src/util/pmix_environ.h" >>> +#include "src/util/progress_threads.h" >>> +#include "src/usock/usock.h" >>> +#include "src/sec/pmix_sec.h" >>> + >>> +#include "pmix_server_ops.h" >>> + >>> +extern pmix_server_module_t pmix_host_server; >>> + >>> +typedef struct { >>> + pmix_object_t super; >>> + pmix_event_t ev; >>> + pmix_status_t status; >>> + const char *data; >>> + size_t ndata; >>> + pmix_dmdx_local_t *lcd; >>> + pmix_release_cbfunc_t relcbfunc; >>> + void *cbdata; >>> +} pmix_dmdx_reply_caddy_t; >>> +static void dcd_con(pmix_dmdx_reply_caddy_t *p) >>> +{ >>> + p->status = PMIX_ERROR; >>> + p->ndata = 0; >>> + p->lcd = NULL; >>> + p->relcbfunc = NULL; >>> + p->cbdata = NULL; >>> +} >>> +PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, >>> + pmix_object_t, dcd_con, NULL); >>> + >>> + >>> +static void dmdx_cbfunc(pmix_status_t status, const char *data, >>> + size_t ndata, void *cbdata, >>> + pmix_release_cbfunc_t relfn, void *relcbdata); >>> +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, >>> + pmix_modex_cbfunc_t cbfunc, void >>> *cbdata); >>> +static pmix_status_t create_local_tracker(char nspace[], int rank, >>> + pmix_info_t info[], size_t >>> ninfo, >>> + pmix_modex_cbfunc_t cbfunc, >>> + void *cbdata, >>> + pmix_dmdx_local_t **lcd); >>> + >>> + >>> +/* declare a function whose sole purpose is to >>> + * free data that we provided to our host server >>> + * when servicing dmodex requests */ >>> +static void relfn(void *cbdata) >>> +{ >>> + char *data = (char*)cbdata; >>> + free(data); >>> +} >>> + >>> + >>> +pmix_status_t pmix_server_get(pmix_buffer_t *buf, >>> + pmix_modex_cbfunc_t cbfunc, >>> + void *cbdata) >>> +{ >>> + int32_t cnt; >>> + pmix_status_t rc; >>> + int rank; >>> + char *cptr; >>> + char nspace[PMIX_MAX_NSLEN+1]; >>> + pmix_nspace_t *ns, *nptr; >>> + pmix_info_t *info=NULL; >>> + size_t ninfo=0; >>> + pmix_dmdx_local_t *lcd, *cd; >>> + pmix_rank_info_t *iptr; >>> + pmix_hash_table_t *ht; >>> + bool local; >>> + >>> + pmix_output_verbose(2, pmix_globals.debug_output, >>> + "recvd GET"); >>> + >>> + /* setup */ >>> + memset(nspace, 0, sizeof(nspace)); >>> + >>> + /* retrieve the nspace and rank of the requested proc */ >>> + cnt = 1; >>> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, >>> PMIX_STRING))) { >>> + PMIX_ERROR_LOG(rc); >>> + return rc; >>> + } >>> + (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); >>> + free(cptr); >>> + cnt = 1; >>> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, >>> PMIX_INT))) { >>> + PMIX_ERROR_LOG(rc); >>> + return rc; >>> + } >>> + /* retrieve any provided info structs */ >>> + cnt = 1; >>> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, >>> PMIX_SIZE))) { >>> + PMIX_ERROR_LOG(rc); >>> + return rc; >>> + } >>> + if (0 < ninfo) { >>> + PMIX_INFO_CREATE(info, ninfo); >>> + cnt = ninfo; >>> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, >>> PMIX_INFO))) { >>> + PMIX_ERROR_LOG(rc); >>> + PMIX_INFO_FREE(info, ninfo); >>> + return rc; >>> + } >>> + } >>> + >>> + /* find the nspace object for this client */ >>> + nptr = NULL; >>> + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >>> + if (0 == strcmp(nspace, ns->nspace)) { >>> + nptr = ns; >>> + break; >>> + } >>> + } >>> + >>> + pmix_output_verbose(2, pmix_globals.debug_output, >>> + "%s:%d EXECUTE GET FOR %s:%d", >>> + pmix_globals.myid.nspace, >>> + pmix_globals.myid.rank, nspace, rank); >>> + >>> + if (NULL == nptr || NULL == nptr->server) { >>> + /* this is for an nspace we don't know about yet, so >>> + * record the request for data from this process and >>> + * give the host server a chance to tell us about it */ >>> + rc = create_local_tracker(nspace, rank, info, ninfo, >>> + cbfunc, cbdata, &lcd); >>> + return rc; >>> + } >>> + >>> + /* We have to wait for all local clients to be registered before >>> + * we can know whether this request is for data from a local or a >>> + * remote client because one client might ask for data about another >>> + * client that the host RM hasn't told us about yet. Fortunately, >>> + * we do know how many clients to expect, so first check to see if >>> + * all clients have been registered with us */ >>> + if (!nptr->server->all_registered) { >>> + /* we cannot do anything further, so just track this request >>> + * for now */ >>> + rc = create_local_tracker(nspace, rank, info, ninfo, >>> + cbfunc, cbdata, &lcd); >>> + return rc; >>> + } >>> + >>> + /* Since we know about all the local clients in this nspace, >>> + * let's first try to satisfy the request with any available data. >>> + * By default, we assume we are looking for data from a remote >>> + * client, and then check to see if this is one of my local >>> + * clients - if so, then we look in that hash table */ >>> + ht = &nptr->server->remote; >>> + local = false; >>> + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { >>> + if (iptr->rank == rank) { >>> + /* it is known local client - check the local table */ >>> + ht = &nptr->server->mylocal; >>> + local = true; >>> + break; >>> + } >>> + } >>> + >>> + /* see if we already have this data */ >>> + rc = _satisfy_request(ht, rank, cbfunc, cbdata); >>> + if( PMIX_SUCCESS == rc ){ >>> + /* request was successfully satisfied */ >>> + PMIX_INFO_FREE(info, ninfo); >>> + return rc; >>> + } >>> + >>> + /* If we get here, then we don't have the data at this time. Check >>> + * to see if we already have a pending request for the data - if >>> + * we do, then we can just wait for it to arrive */ >>> + rc = create_local_tracker(nspace, rank, info, ninfo, >>> + cbfunc, cbdata, &lcd); >>> + if (PMIX_SUCCESS == rc) { >>> + /* we are already waiting for the data - nothing more >>> + * for us to do as the function added the new request >>> + * to the tracker for us */ >>> + return PMIX_SUCCESS; >>> + } >>> + if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) { >>> + /* we have a problem - e.g., out of memory */ >>> + return rc; >>> + } >>> + >>> + /* Getting here means that we didn't already have a request for >>> + * for data pending, and so we created a new tracker for this >>> + * request. We know the identity of all our local clients, so >>> + * if this is one, then we have nothing further to do - we will >>> + * fulfill the request once the process commits its data */ >>> + if (local) { >>> + return PMIX_SUCCESS; >>> + } >>> + >>> + /* this isn't a local client of ours, so we need to ask the host >>> + * resource manager server to please get the info for us from >>> + * whomever is hosting the target process */ >>> + if (NULL != pmix_host_server.direct_modex) { >>> + rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, >>> dmdx_cbfunc, lcd); >>> + } else { >>> + /* if we don't have direct modex feature, just respond with >>> "not found" */ >>> + cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); >>> + PMIX_INFO_FREE(info, ninfo); >>> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >>> &lcd->super); >>> + PMIX_LIST_DESTRUCT(&lcd->loc_reqs); >>> + PMIX_RELEASE(lcd); >>> + rc = PMIX_ERR_NOT_FOUND; >>> + } >>> + >>> + return rc; >>> +} >>> + >>> +static pmix_status_t create_local_tracker(char nspace[], int rank, >>> + pmix_info_t info[], size_t >>> ninfo, >>> + pmix_modex_cbfunc_t cbfunc, >>> + void *cbdata, >>> + pmix_dmdx_local_t **ld) >>> +{ >>> + pmix_dmdx_local_t *lcd, *cd; >>> + pmix_dmdx_request_t *req; >>> + pmix_status_t rc; >>> + >>> + /* define default */ >>> + *ld = NULL; >>> + >>> + /* see if we already have an existing request for data >>> + * from this namespace/rank */ >>> + lcd = NULL; >>> + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >>> pmix_dmdx_local_t) { >>> + if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || >>> + rank != cd->proc.rank ) { >>> + continue; >>> + } >>> + lcd = cd; >>> + break; >>> + } >>> + if (NULL != lcd) { >>> + /* we already have a request, so just track that someone >>> + * else wants data from the same target */ >>> + rc = PMIX_SUCCESS; // indicates we found an existing request >>> + goto complete; >>> + } >>> + /* we do not have an existing request, so let's create >>> + * one and add it to our list */ >>> + lcd = PMIX_NEW(pmix_dmdx_local_t); >>> + if (NULL == lcd){ >>> + PMIX_INFO_FREE(info, ninfo); >>> + return PMIX_ERR_NOMEM; >>> + } >>> + strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN); >>> + lcd->proc.rank = rank; >>> + lcd->info = info; >>> + lcd->ninfo = ninfo; >>> + pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); >>> + rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new >>> request tracker >>> + >>> + complete: >>> + /* track this specific requestor so we return the >>> + * data to them */ >>> + req = PMIX_NEW(pmix_dmdx_request_t); >>> + req->cbfunc = cbfunc; >>> + req->cbdata = cbdata; >>> + pmix_list_append(&lcd->loc_reqs, &req->super); >>> + *ld = lcd; >>> + return rc; >>> +} >>> + >>> +void pmix_pending_nspace_requests(pmix_nspace_t *nptr) >>> +{ >>> + pmix_dmdx_local_t *cd, *cd_next; >>> + >>> + /* Now that we know all local ranks, go along request list and ask >>> for remote data >>> + * for the non-local ranks, and resolve all pending requests for >>> local procs >>> + * that were waiting for registration to complete >>> + */ >>> + PMIX_LIST_FOREACH_SAFE(cd, cd_next, >>> &pmix_server_globals.local_reqs, pmix_dmdx_local_t) { >>> + pmix_rank_info_t *info; >>> + bool found = false; >>> + >>> + if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) >>> ) { >>> + continue; >>> + } >>> + >>> + PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) >>> { >>> + if (info->rank == cd->proc.rank) { >>> + found = true; // we will satisy this request upon >>> commit from new proc >>> + break; >>> + } >>> + } >>> + >>> + /* if not found - this is remote process and we need to send >>> + * corresponding direct modex request */ >>> + if( !found ){ >>> + if( NULL != pmix_host_server.direct_modex ){ >>> + pmix_host_server.direct_modex(&cd->proc, cd->info, >>> cd->ninfo, dmdx_cbfunc, cd); >>> + } else { >>> + pmix_dmdx_request_t *req, *req_next; >>> + PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, >>> pmix_dmdx_request_t) { >>> + req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, >>> req->cbdata, NULL, NULL); >>> + pmix_list_remove_item(&cd->loc_reqs, &req->super); >>> + PMIX_RELEASE(req); >>> + } >>> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >>> &cd->super); >>> + PMIX_RELEASE(cd); >>> + } >>> + } >>> + } >>> +} >>> + >>> +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, >>> + pmix_modex_cbfunc_t cbfunc, void >>> *cbdata) >>> +{ >>> + pmix_status_t rc; >>> + pmix_value_t *val; >>> + char *data; >>> + size_t sz; >>> + pmix_buffer_t xfer, pbkt, *xptr; >>> + >>> + /* check to see if this data already has been >>> + * obtained as a result of a prior direct modex request from >>> + * a remote peer, or due to data from a local client >>> + * having been committed */ >>> + rc = pmix_hash_fetch(ht, rank, "modex", &val); >>> + if (PMIX_SUCCESS == rc && NULL != val) { >>> + /* the client is expecting this to arrive as a byte object >>> + * containing a buffer, so package it accordingly */ >>> + PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); >>> + PMIX_CONSTRUCT(&xfer, pmix_buffer_t); >>> + xptr = &xfer; >>> + PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); >>> + pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER); >>> + xfer.base_ptr = NULL; // protect the passed data >>> + xfer.bytes_used = 0; >>> + PMIX_DESTRUCT(&xfer); >>> + PMIX_UNLOAD_BUFFER(&pbkt, data, sz); >>> + PMIX_DESTRUCT(&pbkt); >>> + PMIX_VALUE_RELEASE(val); >>> + /* pass it back */ >>> + cbfunc(rc, data, sz, cbdata, relfn, data); >>> + return rc; >>> + } >>> + return PMIX_ERR_NOT_FOUND; >>> +} >>> + >>> +/* Resolve pending requests to this namespace/rank */ >>> +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, >>> + pmix_status_t status, >>> pmix_dmdx_local_t *lcd) >>> +{ >>> + pmix_dmdx_local_t *cd; >>> + >>> + /* find corresponding request (if exists) */ >>> + if (NULL == lcd && NULL != nptr) { >>> + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >>> pmix_dmdx_local_t) { >>> + if (0 != strncmp(nptr->nspace, cd->proc.nspace, >>> PMIX_MAX_NSLEN) || >>> + rank != cd->proc.rank) { >>> + continue; >>> + } >>> + lcd = cd; >>> + break; >>> + } >>> + } >>> + >>> + /* If somebody was interested in this rank */ >>> + if (NULL != lcd) { >>> + pmix_dmdx_request_t *req; >>> + >>> + if (PMIX_SUCCESS != status){ >>> + /* if we've got an error for this request - just forward >>> it*/ >>> + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) >>> { >>> + /* if we can't satisfy this request - respond with >>> error */ >>> + req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); >>> + } >>> + } else if (NULL != nptr) { >>> + /* if we've got the blob - try to satisfy requests */ >>> + pmix_hash_table_t *ht; >>> + pmix_rank_info_t *iptr; >>> + >>> + /* by default we are looking for the remote data */ >>> + ht = &nptr->server->remote; >>> + /* check if this rank is local */ >>> + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, >>> pmix_rank_info_t) { >>> + if (iptr->rank == rank) { >>> + ht = &nptr->server->mylocal; >>> + break; >>> + } >>> + } >>> + >>> + /* run through all the requests to this rank */ >>> + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) >>> { >>> + pmix_status_t rc; >>> + rc = _satisfy_request(ht, rank, req->cbfunc, >>> req->cbdata); >>> + if( PMIX_SUCCESS != rc ){ >>> + /* if we can't satisfy this particular request >>> (missing key?) */ >>> + req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); >>> + } >>> + } >>> + } >>> + /* remove all requests to this rank and cleanup the >>> corresponding structure */ >>> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >>> (pmix_list_item_t*)lcd); >>> + PMIX_RELEASE(lcd); >>> + } >>> + return PMIX_SUCCESS; >>> +} >>> + >>> +/* process the returned data from the host RM server */ >>> +static void _process_dmdx_reply(int fd, short args, void *cbdata) >>> +{ >>> + pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; >>> + pmix_kval_t *kp; >>> + pmix_nspace_t *ns, *nptr; >>> + pmix_status_t rc; >>> + pmix_buffer_t xfer, pbkt, *xptr; >>> + >>> + pmix_output_verbose(2, pmix_globals.debug_output, >>> + "[%s:%d] process dmdx reply from %s:%d", >>> + __FILE__, __LINE__, >>> + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); >>> + >>> + /* find the nspace object for this client */ >>> + nptr = NULL; >>> + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >>> + if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { >>> + nptr = ns; >>> + break; >>> + } >>> + } >>> + >>> + if (NULL == nptr) { >>> + /* should be impossible */ >>> + PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); >>> + caddy->status = PMIX_ERR_NOT_FOUND; >>> + goto cleanup; >>> + } >>> + >>> + /* if the request was successfully satisfied, then store the data >>> + * in our hash table for remote procs. Although we could immediately >>> + * resolve any outstanding requests on our tracking list, we instead >>> + * store the data first so we can immediately satisfy any future >>> + * requests. Then, rather than duplicate the resolve code here, we >>> + * will let the pmix_pending_resolve function go ahead and retrieve >>> + * it from the hash table */ >>> + if (PMIX_SUCCESS == caddy->status) { >>> + kp = PMIX_NEW(pmix_kval_t); >>> + kp->key = strdup("modex"); >>> + PMIX_VALUE_CREATE(kp->value, 1); >>> + kp->value->type = PMIX_BYTE_OBJECT; >>> + /* we don't know if the host is going to save this data >>> + * or not, so we have to copy it - the client is expecting >>> + * this to arrive as a byte object containing a buffer, so >>> + * package it accordingly */ >>> + kp->value->data.bo.bytes = malloc(caddy->ndata); >>> + memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); >>> + kp->value->data.bo.size = caddy->ndata; >>> + /* store it in the appropriate hash */ >>> + if (PMIX_SUCCESS != (rc = >>> pmix_hash_store(&nptr->server->remote, caddy->lcd->proc.rank, kp))) { >>> + PMIX_ERROR_LOG(rc); >>> + } >>> + PMIX_RELEASE(kp); // maintain acctg >>> + } >>> + >>> + /* always execute the callback to avoid having the client hang */ >>> + pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, >>> caddy->lcd); >>> + >>> +cleanup: >>> + /* now call the release function so the host server >>> + * knows it can release the data */ >>> + if (NULL != caddy->relcbfunc) { >>> + caddy->relcbfunc(caddy->cbdata); >>> + } >>> + PMIX_RELEASE(caddy); >>> >>> ... > > [Message clipped] > _______________________________________________ > devel mailing list > de...@open-mpi.org > Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel > Link to this post: > http://www.open-mpi.org/community/lists/devel/2015/10/18304.php >