Gilles, Supposing that pmix_common.h has been already included, by adding it again I should have obtained nothing new. I don't know which one is picked up, but now there is at least one pmix_common.h to be included.
If you look carrefully you will notice that the pmix_server.h includes pmix/pmix_common.h and not pmix_common.h. If you want to figure this one, that a good starting point. Btw, why do we have 3 headers with the same name (it's soooo confusing) ? George. On Wed, Oct 28, 2015 at 1:08 AM, Gilles Gouaillardet <[email protected]> wrote: > George, > > pmix_common.h is #include'd by pmix_server.h > > well ... > pmix_common.h is #include'd by > opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h > and there are total 3 pmix_server.h > find . -name pmix_server.h > ./opal/mca/pmix/pmix1xx/pmix/include/pmix_server.h > ./opal/mca/pmix/pmix_server.h > ./orte/orted/pmix/pmix_server.h > > > i am using VPATH, i will give it an other try without VPATH > > so maybe the wrong pmix_server.h was #include'd ... and your fix hides the > real issue, > i will check this from now > > Cheers, > > Gilles > > On 10/28/2015 1:52 PM, George Bosilca wrote: > > Interesting, I wonder how your compiler gets to know the definition of the > PMIX_ERR_SILENT without the pmix_common.h. I just pushed a fix. > > George. > > > On Wed, Oct 28, 2015 at 12:43 AM, Gilles Gouaillardet < > <[email protected]>[email protected]> wrote: > >> George, >> >> i am unable to reproduce the issue. >> if build still breaks for you, could you send me your configure command >> line ? >> >> Cheers, >> >> Gilles >> >> >> On 10/28/2015 1:04 PM, Gilles Gouaillardet wrote: >> >> George, >> >> PMIX_ERR_SILENT is defined in opal/mca/pmix/pmix1xx/pmix/include/pmix/ >> pmix_common.h.in >> >> i ll have a look at it from now >> >> Cheers, >> >> Gilles >> >> On 10/28/2015 12:02 PM, George Bosilca wrote: >> >> We get a nice compiler complaint: >> >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: >> In function 'pmix_server_get': >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >> error: 'PMIX_ERR_SILENT' undeclared (first use in this function) >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >> error: (Each undeclared identifier is reported only once >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131: >> error: for each function it appears in.) >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:117: >> warning: unused variable 'cd' >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c: >> In function '_process_dmdx_reply': >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:478: >> error: 'PMIX_ERR_SILENT' undeclared (first use in this function) >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >> warning: unused variable 'xptr' >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >> warning: unused variable 'pbkt' >> ../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460: >> warning: unused variable 'xfer' >> >> And he is right: >> >> $find . -name "*.h" -exec grep -Hn PMIX_ERR_SILENT {} + >> ./opal/mca/pmix/pmix1xx/pmix/src/util/error.h:33: if >> (PMIX_ERR_SILENT != (r)) { \ >> >> George. >> >> ---------- Forwarded message ---------- >> From: < <[email protected]>[email protected]> >> Date: Tue, Oct 27, 2015 at 10:39 PM >> Subject: [OMPI commits] Git: open-mpi/ompi branch master updated. >> dev-2921-gb603307 >> To: [email protected] >> >> >> This is an automated email from the git hooks/post-receive script. It was >> generated because a ref change was pushed to the repository containing >> the project "open-mpi/ompi". >> >> The branch, master has been updated >> via b603307f7d33663ef6fe5941bb0d94bd2be017cb (commit) >> via 267ca8fcd3a59b780491d80d29e870061d8dac56 (commit) >> from 3035e140511b082c51ad66e116dd381a083a191d (commit) >> >> Those revisions listed above that are new to this repository have >> not appeared on any other notification email; so we list those >> revisions in full, below. >> >> - Log ----------------------------------------------------------------- >> >> https://github.com/open-mpi/ompi/commit/b603307f7d33663ef6fe5941bb0d94bd2be017cb >> >> commit b603307f7d33663ef6fe5941bb0d94bd2be017cb >> Merge: 3035e14 267ca8f >> Author: rhc54 <[email protected]> >> Date: Tue Oct 27 19:39:10 2015 -0700 >> >> Merge pull request #1073 from rhc54/topic/pmix >> >> Cleanup the PMIx direct modex support. >> >> >> >> >> https://github.com/open-mpi/ompi/commit/267ca8fcd3a59b780491d80d29e870061d8dac56 >> >> commit 267ca8fcd3a59b780491d80d29e870061d8dac56 >> Author: Ralph Castain < <[email protected]>[email protected]> >> Date: Tue Oct 27 11:01:49 2015 -0700 >> >> Cleanup the PMIx direct modex support. Add an MCA parameter >> pmix_base_async_modex that will cause the async modex to be used when set >> to 1. Default it to 0 for now >> to continue current default behavior. >> >> Also add an MCA param pmix_base_collect_data to direct that the >> blocking fence shall return all data to each process. Obviously, this param >> has no effect if async_ >> modex is used. >> >> diff --git a/ompi/runtime/ompi_mpi_init.c b/ompi/runtime/ompi_mpi_init.c >> index 4c0391d..d0eebb2 100644 >> --- a/ompi/runtime/ompi_mpi_init.c >> +++ b/ompi/runtime/ompi_mpi_init.c >> @@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char **argv, int >> requested, int *provided) >> >> /* exchange connection info - this function may also act as a barrier >> * if data exchange is required. The modex occurs solely across procs >> - * in our job, so no proc array is passed. If a barrier is required, >> - * the "modex" function will perform it internally >> - */ >> - OPAL_MODEX(NULL, 1); >> + * in our job. If a barrier is required, the "modex" function will >> + * perform it internally */ >> + OPAL_MODEX(); >> >> OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier")); >> >> diff --git a/opal/mca/pmix/base/pmix_base_frame.c >> b/opal/mca/pmix/base/pmix_base_frame.c >> index e1ab766..6e8a347 100644 >> --- a/opal/mca/pmix/base/pmix_base_frame.c >> +++ b/opal/mca/pmix/base/pmix_base_frame.c >> @@ -31,12 +31,21 @@ >> /* Note that this initializer is important -- do not remove it! See >> https://github.com/open-mpi/ompi/issues/375 for details. */ >> opal_pmix_base_module_t opal_pmix = { 0 }; >> -bool opal_pmix_collect_all_data = false; >> +bool opal_pmix_collect_all_data = true; >> bool opal_pmix_base_allow_delayed_server = false; >> int opal_pmix_verbose_output = -1; >> +bool opal_pmix_base_async_modex = false; >> >> static int opal_pmix_base_frame_register(mca_base_register_flag_t flags) >> { >> + opal_pmix_base_async_modex = false; >> + (void) mca_base_var_register("opal", "pmix", "base", "async_modex", >> "Use asynchronous modex mode", >> + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, >> OPAL_INFO_LVL_9, >> + MCA_BASE_VAR_SCOPE_READONLY, >> &opal_pmix_base_async_modex); >> + opal_pmix_collect_all_data = true; >> + (void) mca_base_var_register("opal", "pmix", "base", "collect_data", >> "Collect all data during modex", >> + MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, >> OPAL_INFO_LVL_9, >> + MCA_BASE_VAR_SCOPE_READONLY, >> &opal_pmix_collect_all_data); >> return OPAL_SUCCESS; >> } >> >> diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h >> index f265e01..7223529 100644 >> --- a/opal/mca/pmix/pmix.h >> +++ b/opal/mca/pmix/pmix.h >> @@ -36,6 +36,8 @@ BEGIN_C_DECLS >> /* provide access to the framework verbose output without >> * exposing the entire base */ >> extern int opal_pmix_verbose_output; >> +extern bool opal_pmix_collect_all_data; >> +extern bool opal_pmix_base_async_modex; >> extern int opal_pmix_base_exchange(opal_value_t *info, >> opal_pmix_pdata_t *pdat, >> int timeout); >> @@ -254,10 +256,13 @@ extern int opal_pmix_base_exchange(opal_value_t >> *info, >> * that takes into account directives and availability of >> * non-blocking operations >> */ >> -#define OPAL_MODEX(p, s) \ >> - do { \ >> - opal_pmix.commit(); \ >> - opal_pmix.fence((p), (s)); \ >> +#define OPAL_MODEX() \ >> + do { \ >> + opal_pmix.commit(); \ >> + if (!opal_pmix_base_async_modex) { \ >> + opal_pmix.fence(NULL, \ >> + opal_pmix_collect_all_data); \ >> + } \ >> } while(0); >> >> /** >> diff --git a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >> b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >> index 0216e34..5a111a1 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >> +++ b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in >> @@ -183,7 +183,7 @@ BEGIN_C_DECLS >> >> /**** PMIX ERROR CONSTANTS ****/ >> /* PMIx errors are always negative, with 0 reserved for success */ >> -#define PMIX_ERROR_MIN -41 // set equal to number of non-zero entries >> in enum >> +#define PMIX_ERROR_MIN -42 // set equal to number of non-zero entries >> in enum >> >> typedef enum { >> PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN, >> @@ -230,6 +230,7 @@ typedef enum { >> PMIX_ERR_INVALID_CRED, >> PMIX_EXISTS, >> >> + PMIX_ERR_SILENT, >> PMIX_ERROR, >> PMIX_SUCCESS >> } pmix_status_t; >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >> b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >> index d41be9c..b93ca6d 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c >> @@ -458,6 +458,7 @@ static void getnb_cbfunc(struct pmix_peer_t *pr, >> pmix_usock_hdr_t *hdr, >> PMIX_RELEASE(bptr); // free's the data region >> if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { >> PMIX_ERROR_LOG(rc); >> + rc = PMIX_ERR_SILENT; // avoid error-logging twice >> break; >> } >> } >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >> b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >> index 5422b78..88b0468 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am >> @@ -16,4 +16,5 @@ sources += \ >> src/server/pmix_server.c \ >> src/server/pmix_server_ops.c \ >> src/server/pmix_server_regex.c \ >> - src/server/pmix_server_listener.c >> + src/server/pmix_server_listener.c \ >> + src/server/pmix_server_get.c >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >> index d16ae16..85f9e17 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c >> @@ -141,8 +141,7 @@ static void _queue_message(int fd, short args, void >> *cbdata) >> pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata; >> pmix_usock_send_t *snd; >> pmix_output_verbose(2, pmix_globals.debug_output, >> - "[%s:%d] queue callback called: reply to %s:%d >> on tag %d," >> - "event_is_active=%d", >> + "[%s:%d] queue callback called: reply to %s:%d >> on tag %d", >> __FILE__, __LINE__, >> (queue->peer)->info->nptr->nspace, >> (queue->peer)->info->rank, (queue->tag), >> @@ -179,12 +178,10 @@ static void _queue_message(int fd, short args, void >> *cbdata) >> queue->buf = (b); \ >> queue->tag = (t); \ >> pmix_output_verbose(2, pmix_globals.debug_output, \ >> - "[%s:%d] queue reply to %s:%d on tag %d," \ >> - "event_is_active=%d", \ >> + "[%s:%d] queue reply to %s:%d on tag %d", \ >> __FILE__, __LINE__, \ >> (queue->peer)->info->nptr->nspace, \ >> - (queue->peer)->info->rank, (queue->tag), \ >> - (queue->peer)->send_ev_active); \ >> + (queue->peer)->info->rank, (queue->tag)); \ >> event_assign(&queue->ev, pmix_globals.evbase, -1, \ >> EV_WRITE, _queue_message, queue); \ >> event_priority_set(&queue->ev, 0); \ >> @@ -723,7 +720,7 @@ static void _register_client(int sd, short args, void >> *cbdata) >> * someone has been waiting for a request on a remote proc >> * in one of our nspaces, but we didn't know all the local procs >> * and so couldn't determine the proc was remote */ >> - pmix_pending_nspace_fix(nptr); >> + pmix_pending_nspace_requests(nptr); >> } >> /* let the caller know we are done */ >> if (NULL != cd->opcbfunc) { >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >> new file mode 100644 >> index 0000000..2cb75cf >> --- /dev/null >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >> @@ -0,0 +1,552 @@ >> +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ >> +/* >> + * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. >> + * Copyright (c) 2014-2015 Research Organization for Information Science >> + * and Technology (RIST). All rights reserved. >> + * Copyright (c) 2014-2015 Artem Y. Polyakov <[email protected]>. >> + * All rights reserved. >> + * Copyright (c) 2015 Mellanox Technologies, Inc. >> + * All rights reserved. >> + * $COPYRIGHT$ >> + * >> + * Additional copyrights may follow >> + * >> + * $HEADER$ >> + */ >> + >> +#include <private/autogen/config.h> >> +#include <pmix/rename.h> >> +#include <private/types.h> >> +#include <private/pmix_stdint.h> >> +#include <private/pmix_socket_errno.h> >> + >> +#include <pmix_server.h> >> +#include "src/include/pmix_globals.h" >> + >> +#ifdef HAVE_STRING_H >> +#include <string.h> >> +#endif >> +#include <fcntl.h> >> +#ifdef HAVE_UNISTD_H >> +#include <unistd.h> >> +#endif >> +#ifdef HAVE_SYS_SOCKET_H >> +#include <sys/socket.h> >> +#endif >> +#ifdef HAVE_SYS_UN_H >> +#include <sys/un.h> >> +#endif >> +#ifdef HAVE_SYS_UIO_H >> +#include <sys/uio.h> >> +#endif >> +#ifdef HAVE_SYS_TYPES_H >> +#include <sys/types.h> >> +#endif >> +#include PMIX_EVENT_HEADER >> + >> +#include "src/class/pmix_list.h" >> +#include "src/buffer_ops/buffer_ops.h" >> +#include "src/util/argv.h" >> +#include "src/util/error.h" >> +#include "src/util/output.h" >> +#include "src/util/pmix_environ.h" >> +#include "src/util/progress_threads.h" >> +#include "src/usock/usock.h" >> +#include "src/sec/pmix_sec.h" >> + >> +#include "pmix_server_ops.h" >> + >> +extern pmix_server_module_t pmix_host_server; >> + >> +typedef struct { >> + pmix_object_t super; >> + pmix_event_t ev; >> + pmix_status_t status; >> + const char *data; >> + size_t ndata; >> + pmix_dmdx_local_t *lcd; >> + pmix_release_cbfunc_t relcbfunc; >> + void *cbdata; >> +} pmix_dmdx_reply_caddy_t; >> +static void dcd_con(pmix_dmdx_reply_caddy_t *p) >> +{ >> + p->status = PMIX_ERROR; >> + p->ndata = 0; >> + p->lcd = NULL; >> + p->relcbfunc = NULL; >> + p->cbdata = NULL; >> +} >> +PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, >> + pmix_object_t, dcd_con, NULL); >> + >> + >> +static void dmdx_cbfunc(pmix_status_t status, const char *data, >> + size_t ndata, void *cbdata, >> + pmix_release_cbfunc_t relfn, void *relcbdata); >> +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, >> + pmix_modex_cbfunc_t cbfunc, void >> *cbdata); >> +static pmix_status_t create_local_tracker(char nspace[], int rank, >> + pmix_info_t info[], size_t >> ninfo, >> + pmix_modex_cbfunc_t cbfunc, >> + void *cbdata, >> + pmix_dmdx_local_t **lcd); >> + >> + >> +/* declare a function whose sole purpose is to >> + * free data that we provided to our host server >> + * when servicing dmodex requests */ >> +static void relfn(void *cbdata) >> +{ >> + char *data = (char*)cbdata; >> + free(data); >> +} >> + >> + >> +pmix_status_t pmix_server_get(pmix_buffer_t *buf, >> + pmix_modex_cbfunc_t cbfunc, >> + void *cbdata) >> +{ >> + int32_t cnt; >> + pmix_status_t rc; >> + int rank; >> + char *cptr; >> + char nspace[PMIX_MAX_NSLEN+1]; >> + pmix_nspace_t *ns, *nptr; >> + pmix_info_t *info=NULL; >> + size_t ninfo=0; >> + pmix_dmdx_local_t *lcd, *cd; >> + pmix_rank_info_t *iptr; >> + pmix_hash_table_t *ht; >> + bool local; >> + >> + pmix_output_verbose(2, pmix_globals.debug_output, >> + "recvd GET"); >> + >> + /* setup */ >> + memset(nspace, 0, sizeof(nspace)); >> + >> + /* retrieve the nspace and rank of the requested proc */ >> + cnt = 1; >> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, >> PMIX_STRING))) { >> + PMIX_ERROR_LOG(rc); >> + return rc; >> + } >> + (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); >> + free(cptr); >> + cnt = 1; >> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, >> PMIX_INT))) { >> + PMIX_ERROR_LOG(rc); >> + return rc; >> + } >> + /* retrieve any provided info structs */ >> + cnt = 1; >> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, >> PMIX_SIZE))) { >> + PMIX_ERROR_LOG(rc); >> + return rc; >> + } >> + if (0 < ninfo) { >> + PMIX_INFO_CREATE(info, ninfo); >> + cnt = ninfo; >> + if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, >> PMIX_INFO))) { >> + PMIX_ERROR_LOG(rc); >> + PMIX_INFO_FREE(info, ninfo); >> + return rc; >> + } >> + } >> + >> + /* find the nspace object for this client */ >> + nptr = NULL; >> + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >> + if (0 == strcmp(nspace, ns->nspace)) { >> + nptr = ns; >> + break; >> + } >> + } >> + >> + pmix_output_verbose(2, pmix_globals.debug_output, >> + "%s:%d EXECUTE GET FOR %s:%d", >> + pmix_globals.myid.nspace, >> + pmix_globals.myid.rank, nspace, rank); >> + >> + if (NULL == nptr || NULL == nptr->server) { >> + /* this is for an nspace we don't know about yet, so >> + * record the request for data from this process and >> + * give the host server a chance to tell us about it */ >> + rc = create_local_tracker(nspace, rank, info, ninfo, >> + cbfunc, cbdata, &lcd); >> + return rc; >> + } >> + >> + /* We have to wait for all local clients to be registered before >> + * we can know whether this request is for data from a local or a >> + * remote client because one client might ask for data about another >> + * client that the host RM hasn't told us about yet. Fortunately, >> + * we do know how many clients to expect, so first check to see if >> + * all clients have been registered with us */ >> + if (!nptr->server->all_registered) { >> + /* we cannot do anything further, so just track this request >> + * for now */ >> + rc = create_local_tracker(nspace, rank, info, ninfo, >> + cbfunc, cbdata, &lcd); >> + return rc; >> + } >> + >> + /* Since we know about all the local clients in this nspace, >> + * let's first try to satisfy the request with any available data. >> + * By default, we assume we are looking for data from a remote >> + * client, and then check to see if this is one of my local >> + * clients - if so, then we look in that hash table */ >> + ht = &nptr->server->remote; >> + local = false; >> + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { >> + if (iptr->rank == rank) { >> + /* it is known local client - check the local table */ >> + ht = &nptr->server->mylocal; >> + local = true; >> + break; >> + } >> + } >> + >> + /* see if we already have this data */ >> + rc = _satisfy_request(ht, rank, cbfunc, cbdata); >> + if( PMIX_SUCCESS == rc ){ >> + /* request was successfully satisfied */ >> + PMIX_INFO_FREE(info, ninfo); >> + return rc; >> + } >> + >> + /* If we get here, then we don't have the data at this time. Check >> + * to see if we already have a pending request for the data - if >> + * we do, then we can just wait for it to arrive */ >> + rc = create_local_tracker(nspace, rank, info, ninfo, >> + cbfunc, cbdata, &lcd); >> + if (PMIX_SUCCESS == rc) { >> + /* we are already waiting for the data - nothing more >> + * for us to do as the function added the new request >> + * to the tracker for us */ >> + return PMIX_SUCCESS; >> + } >> + if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) { >> + /* we have a problem - e.g., out of memory */ >> + return rc; >> + } >> + >> + /* Getting here means that we didn't already have a request for >> + * for data pending, and so we created a new tracker for this >> + * request. We know the identity of all our local clients, so >> + * if this is one, then we have nothing further to do - we will >> + * fulfill the request once the process commits its data */ >> + if (local) { >> + return PMIX_SUCCESS; >> + } >> + >> + /* this isn't a local client of ours, so we need to ask the host >> + * resource manager server to please get the info for us from >> + * whomever is hosting the target process */ >> + if (NULL != pmix_host_server.direct_modex) { >> + rc = pmix_host_server.direct_modex(&lcd->proc, info, ninfo, >> dmdx_cbfunc, lcd); >> + } else { >> + /* if we don't have direct modex feature, just respond with "not >> found" */ >> + cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); >> + PMIX_INFO_FREE(info, ninfo); >> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >> &lcd->super); >> + PMIX_LIST_DESTRUCT(&lcd->loc_reqs); >> + PMIX_RELEASE(lcd); >> + rc = PMIX_ERR_NOT_FOUND; >> + } >> + >> + return rc; >> +} >> + >> +static pmix_status_t create_local_tracker(char nspace[], int rank, >> + pmix_info_t info[], size_t >> ninfo, >> + pmix_modex_cbfunc_t cbfunc, >> + void *cbdata, >> + pmix_dmdx_local_t **ld) >> +{ >> + pmix_dmdx_local_t *lcd, *cd; >> + pmix_dmdx_request_t *req; >> + pmix_status_t rc; >> + >> + /* define default */ >> + *ld = NULL; >> + >> + /* see if we already have an existing request for data >> + * from this namespace/rank */ >> + lcd = NULL; >> + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> + if (0 != strncmp(nspace, cd->proc.nspace, PMIX_MAX_NSLEN) || >> + rank != cd->proc.rank ) { >> + continue; >> + } >> + lcd = cd; >> + break; >> + } >> + if (NULL != lcd) { >> + /* we already have a request, so just track that someone >> + * else wants data from the same target */ >> + rc = PMIX_SUCCESS; // indicates we found an existing request >> + goto complete; >> + } >> + /* we do not have an existing request, so let's create >> + * one and add it to our list */ >> + lcd = PMIX_NEW(pmix_dmdx_local_t); >> + if (NULL == lcd){ >> + PMIX_INFO_FREE(info, ninfo); >> + return PMIX_ERR_NOMEM; >> + } >> + strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN); >> + lcd->proc.rank = rank; >> + lcd->info = info; >> + lcd->ninfo = ninfo; >> + pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); >> + rc = PMIX_ERR_NOT_FOUND; // indicates that we created a new request >> tracker >> + >> + complete: >> + /* track this specific requestor so we return the >> + * data to them */ >> + req = PMIX_NEW(pmix_dmdx_request_t); >> + req->cbfunc = cbfunc; >> + req->cbdata = cbdata; >> + pmix_list_append(&lcd->loc_reqs, &req->super); >> + *ld = lcd; >> + return rc; >> +} >> + >> +void pmix_pending_nspace_requests(pmix_nspace_t *nptr) >> +{ >> + pmix_dmdx_local_t *cd, *cd_next; >> + >> + /* Now that we know all local ranks, go along request list and ask >> for remote data >> + * for the non-local ranks, and resolve all pending requests for >> local procs >> + * that were waiting for registration to complete >> + */ >> + PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> + pmix_rank_info_t *info; >> + bool found = false; >> + >> + if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) >> ) { >> + continue; >> + } >> + >> + PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { >> + if (info->rank == cd->proc.rank) { >> + found = true; // we will satisy this request upon >> commit from new proc >> + break; >> + } >> + } >> + >> + /* if not found - this is remote process and we need to send >> + * corresponding direct modex request */ >> + if( !found ){ >> + if( NULL != pmix_host_server.direct_modex ){ >> + pmix_host_server.direct_modex(&cd->proc, cd->info, >> cd->ninfo, dmdx_cbfunc, cd); >> + } else { >> + pmix_dmdx_request_t *req, *req_next; >> + PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, >> pmix_dmdx_request_t) { >> + req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, >> req->cbdata, NULL, NULL); >> + pmix_list_remove_item(&cd->loc_reqs, &req->super); >> + PMIX_RELEASE(req); >> + } >> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >> &cd->super); >> + PMIX_RELEASE(cd); >> + } >> + } >> + } >> +} >> + >> +static pmix_status_t _satisfy_request(pmix_hash_table_t *ht, int rank, >> + pmix_modex_cbfunc_t cbfunc, void >> *cbdata) >> +{ >> + pmix_status_t rc; >> + pmix_value_t *val; >> + char *data; >> + size_t sz; >> + pmix_buffer_t xfer, pbkt, *xptr; >> + >> + /* check to see if this data already has been >> + * obtained as a result of a prior direct modex request from >> + * a remote peer, or due to data from a local client >> + * having been committed */ >> + rc = pmix_hash_fetch(ht, rank, "modex", &val); >> + if (PMIX_SUCCESS == rc && NULL != val) { >> + /* the client is expecting this to arrive as a byte object >> + * containing a buffer, so package it accordingly */ >> + PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); >> + PMIX_CONSTRUCT(&xfer, pmix_buffer_t); >> + xptr = &xfer; >> + PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); >> + pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER); >> + xfer.base_ptr = NULL; // protect the passed data >> + xfer.bytes_used = 0; >> + PMIX_DESTRUCT(&xfer); >> + PMIX_UNLOAD_BUFFER(&pbkt, data, sz); >> + PMIX_DESTRUCT(&pbkt); >> + PMIX_VALUE_RELEASE(val); >> + /* pass it back */ >> + cbfunc(rc, data, sz, cbdata, relfn, data); >> + return rc; >> + } >> + return PMIX_ERR_NOT_FOUND; >> +} >> + >> +/* Resolve pending requests to this namespace/rank */ >> +pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, >> + pmix_status_t status, >> pmix_dmdx_local_t *lcd) >> +{ >> + pmix_dmdx_local_t *cd; >> + >> + /* find corresponding request (if exists) */ >> + if (NULL == lcd && NULL != nptr) { >> + PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> + if (0 != strncmp(nptr->nspace, cd->proc.nspace, >> PMIX_MAX_NSLEN) || >> + rank != cd->proc.rank) { >> + continue; >> + } >> + lcd = cd; >> + break; >> + } >> + } >> + >> + /* If somebody was interested in this rank */ >> + if (NULL != lcd) { >> + pmix_dmdx_request_t *req; >> + >> + if (PMIX_SUCCESS != status){ >> + /* if we've got an error for this request - just forward it*/ >> + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { >> + /* if we can't satisfy this request - respond with error >> */ >> + req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); >> + } >> + } else if (NULL != nptr) { >> + /* if we've got the blob - try to satisfy requests */ >> + pmix_hash_table_t *ht; >> + pmix_rank_info_t *iptr; >> + >> + /* by default we are looking for the remote data */ >> + ht = &nptr->server->remote; >> + /* check if this rank is local */ >> + PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, >> pmix_rank_info_t) { >> + if (iptr->rank == rank) { >> + ht = &nptr->server->mylocal; >> + break; >> + } >> + } >> + >> + /* run through all the requests to this rank */ >> + PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { >> + pmix_status_t rc; >> + rc = _satisfy_request(ht, rank, req->cbfunc, >> req->cbdata); >> + if( PMIX_SUCCESS != rc ){ >> + /* if we can't satisfy this particular request >> (missing key?) */ >> + req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); >> + } >> + } >> + } >> + /* remove all requests to this rank and cleanup the >> corresponding structure */ >> + pmix_list_remove_item(&pmix_server_globals.local_reqs, >> (pmix_list_item_t*)lcd); >> + PMIX_RELEASE(lcd); >> + } >> + return PMIX_SUCCESS; >> +} >> + >> +/* process the returned data from the host RM server */ >> +static void _process_dmdx_reply(int fd, short args, void *cbdata) >> +{ >> + pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; >> + pmix_kval_t *kp; >> + pmix_nspace_t *ns, *nptr; >> + pmix_status_t rc; >> + pmix_buffer_t xfer, pbkt, *xptr; >> + >> + pmix_output_verbose(2, pmix_globals.debug_output, >> + "[%s:%d] process dmdx reply from %s:%d", >> + __FILE__, __LINE__, >> + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); >> + >> + /* find the nspace object for this client */ >> + nptr = NULL; >> + PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >> + if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { >> + nptr = ns; >> + break; >> + } >> + } >> + >> + if (NULL == nptr) { >> + /* should be impossible */ >> + PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); >> + caddy->status = PMIX_ERR_NOT_FOUND; >> + goto cleanup; >> + } >> + >> + /* if the request was successfully satisfied, then store the data >> + * in our hash table for remote procs. Although we could immediately >> + * resolve any outstanding requests on our tracking list, we instead >> + * store the data first so we can immediately satisfy any future >> + * requests. Then, rather than duplicate the resolve code here, we >> + * will let the pmix_pending_resolve function go ahead and retrieve >> + * it from the hash table */ >> + if (PMIX_SUCCESS == caddy->status) { >> + kp = PMIX_NEW(pmix_kval_t); >> + kp->key = strdup("modex"); >> + PMIX_VALUE_CREATE(kp->value, 1); >> + kp->value->type = PMIX_BYTE_OBJECT; >> + /* we don't know if the host is going to save this data >> + * or not, so we have to copy it - the client is expecting >> + * this to arrive as a byte object containing a buffer, so >> + * package it accordingly */ >> + kp->value->data.bo.bytes = malloc(caddy->ndata); >> + memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); >> + kp->value->data.bo.size = caddy->ndata; >> + /* store it in the appropriate hash */ >> + if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, >> caddy->lcd->proc.rank, kp))) { >> + PMIX_ERROR_LOG(rc); >> + } >> + PMIX_RELEASE(kp); // maintain acctg >> + } >> + >> + /* always execute the callback to avoid having the client hang */ >> + pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, >> caddy->lcd); >> + >> +cleanup: >> + /* now call the release function so the host server >> + * knows it can release the data */ >> + if (NULL != caddy->relcbfunc) { >> + caddy->relcbfunc(caddy->cbdata); >> + } >> + PMIX_RELEASE(caddy); >> +} >> + >> +/* this is the callback function that the host RM server will call >> + * when it gets requested info back from a remote server */ >> +static void dmdx_cbfunc(pmix_status_t status, >> + const char *data, size_t ndata, void *cbdata, >> + pmix_release_cbfunc_t release_fn, void >> *release_cbdata) >> +{ >> + pmix_dmdx_reply_caddy_t *caddy; >> + >> + /* because the host RM is calling us from their own thread, we >> + * need to thread-shift into our local progress thread before >> + * accessing any global info */ >> + caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); >> + caddy->status = status; >> + /* point to the callers cbfunc */ >> + caddy->relcbfunc = release_fn; >> + caddy->cbdata = release_cbdata; >> + >> + /* point to the returned data and our own internal >> + * tracker */ >> + caddy->data = data; >> + caddy->ndata = ndata; >> + caddy->lcd = (pmix_dmdx_local_t *)cbdata; >> + pmix_output_verbose(2, pmix_globals.debug_output, >> + "[%s:%d] queue dmdx reply for %s:%d", >> + __FILE__, __LINE__, >> + caddy->lcd->proc.nspace, caddy->lcd->proc.rank); >> + event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, >> + _process_dmdx_reply, caddy); >> + event_priority_set(&caddy->ev, 0); >> + event_active(&caddy->ev, EV_WRITE, 1); >> +} >> + >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c >> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c >> index 4a4abd1..43d35b5 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c >> @@ -58,246 +58,6 @@ >> >> pmix_server_module_t pmix_host_server = {0}; >> >> -static void dmdx_cbfunc(pmix_status_t status, const char *data, >> - size_t ndata, void *cbdata, >> - pmix_release_cbfunc_t relfn, void *relcbdata); >> - >> -typedef struct { >> - pmix_object_t super; >> - pmix_event_t ev; >> - pmix_status_t status; >> - const char *data; >> - size_t ndata; >> - pmix_dmdx_local_t *lcd; >> - pmix_release_cbfunc_t relcbfunc; >> - void *cbdata; >> -} pmix_dmdx_reply_caddy_t; >> -PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t, >> - pmix_object_t, NULL, NULL); >> - >> - >> -static void relfn(void *cbdata) >> -{ >> - char *data = (char*)cbdata; >> - free(data); >> -} >> - >> -static pmix_status_t _satisfy_request(pmix_nspace_t *nptr, int rank, >> - pmix_hash_table_t *ht, >> - pmix_modex_cbfunc_t cbfunc, >> void *cbdata) >> -{ >> - pmix_status_t rc; >> - pmix_buffer_t pbkt, xfer; >> - pmix_value_t *val; >> - char *data; >> - size_t sz; >> - >> - /* check to see if this data already has been >> - * obtained as a result of a prior direct modex request from >> - * another local peer */ >> - rc = pmix_hash_fetch(ht, rank, "modex", &val); >> - if (PMIX_SUCCESS == rc && NULL != val) { >> - PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); >> - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); >> - pmix_buffer_t *pxfer = &xfer; >> - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, val->data.bo.size); >> - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); >> - xfer.base_ptr = NULL; >> - xfer.bytes_used = 0; >> - PMIX_DESTRUCT(&xfer); >> - PMIX_VALUE_RELEASE(val); >> - PMIX_UNLOAD_BUFFER(&pbkt, data, sz); >> - PMIX_DESTRUCT(&pbkt); >> - /* pass it back */ >> - cbfunc(rc, data, sz, cbdata, relfn, data); >> - return rc; >> - } >> - return PMIX_ERR_NOT_FOUND; >> -} >> - >> -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, >> - pmix_info_t *info, size_t ninfo, >> - pmix_modex_cbfunc_t cbfunc, void >> *cbdata) >> -{ >> - pmix_dmdx_local_t *lcd = NULL, *cd; >> - pmix_rank_info_t *iptr; >> - pmix_hash_table_t *ht; >> - pmix_status_t rc; >> - >> - /* 1. Try to satisfy the request right now */ >> - >> - /* by default we are looking for the remote data */ >> - ht = &nptr->server->remote; >> - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, pmix_rank_info_t) { >> - if (iptr->rank == rank) { >> - /* in case it is known local rank - check local table */ >> - ht = &nptr->server->mylocal; >> - break; >> - } >> - } >> - >> - rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata); >> - if( PMIX_SUCCESS == rc ){ >> - /* request was successfully satisfied */ >> - PMIX_INFO_FREE(info, ninfo); >> - return rc; >> - } >> - >> - /* 2. We were unable to satisfy request right now. >> - * Look for existing requests to this namespace/rank */ >> - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) >> || >> - rank != cd->proc.rank ) { >> - continue; >> - } >> - lcd = cd; >> - break; >> - } >> - >> - /* 3. If no requests exists then: >> - * - if all local clients are registered then we were called because >> - * the remote data was requested. Create request and call direct >> modex >> - * to retrieve the data >> - * - if not all local ranks were registered, we need to wait untill >> - * pmix_pending_localy_fin would be called to resolve this. Just add >> the >> - * request for now. >> - */ >> - if (NULL == lcd) { >> - lcd = PMIX_NEW(pmix_dmdx_local_t); >> - if (NULL == lcd){ >> - PMIX_INFO_FREE(info, ninfo); >> - return PMIX_ERR_NOMEM; >> - } >> - strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN); >> - lcd->proc.rank = rank; >> - lcd->info = info; >> - lcd->ninfo = ninfo; >> - pmix_list_append(&pmix_server_globals.local_reqs, &lcd->super); >> - >> - /* check & send request if need/possible */ >> - if (nptr->server->all_registered && NULL == info) { >> - if (NULL != pmix_host_server.direct_modex) { >> - pmix_host_server.direct_modex(&lcd->proc, info, ninfo, >> dmdx_cbfunc, lcd); >> - } else { >> - /* if we don't have direct modex feature, just respond >> with "not found" */ >> - cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); >> - PMIX_INFO_FREE(info, ninfo); >> - pmix_list_remove_item(&pmix_server_globals.local_reqs, >> &lcd->super); >> - PMIX_LIST_DESTRUCT(&lcd->loc_reqs); >> - PMIX_RELEASE(lcd); >> - return PMIX_SUCCESS; >> - } >> - } >> - } >> - pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t); >> - req->cbfunc = cbfunc; >> - req->cbdata = cbdata; >> - pmix_list_append(&lcd->loc_reqs, &req->super); >> - return PMIX_SUCCESS; >> -} >> - >> -void pmix_pending_nspace_fix(pmix_nspace_t *nptr) >> -{ >> - pmix_dmdx_local_t *cd, *cd_next; >> - >> - /* Now when we know all local ranks, go along request list and ask >> for remote data >> - * for the non-local ranks, and resolve all pending requests for >> local procs >> - * that were waiting for registration to complete >> - */ >> - PMIX_LIST_FOREACH_SAFE(cd, cd_next, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> - pmix_rank_info_t *info; >> - bool found = false; >> - >> - if (0 != strncmp(nptr->nspace, cd->proc.nspace, PMIX_MAX_NSLEN) >> ) { >> - continue; >> - } >> - >> - PMIX_LIST_FOREACH(info, &nptr->server->ranks, pmix_rank_info_t) { >> - if (info->rank == cd->proc.rank) { >> - found = true; >> - break; >> - } >> - } >> - >> - /* if not found - this is remote process and we need to send >> - * corresponding direct modex request */ >> - if( !found ){ >> - if( NULL != pmix_host_server.direct_modex ){ >> - pmix_host_server.direct_modex(&cd->proc, cd->info, >> cd->ninfo, dmdx_cbfunc, cd); >> - } else { >> - pmix_dmdx_request_t *req, *req_next; >> - PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs, >> pmix_dmdx_request_t) { >> - req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, >> req->cbdata, NULL, NULL); >> - pmix_list_remove_item(&cd->loc_reqs, &req->super); >> - PMIX_RELEASE(req); >> - } >> - pmix_list_remove_item(&pmix_server_globals.local_reqs, >> &cd->super); >> - PMIX_RELEASE(cd); >> - } >> - } >> - } >> -} >> - >> -/* Resolve pending requests to this namespace/rank */ >> -pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, >> - pmix_status_t status, >> pmix_dmdx_local_t *lcd) >> -{ >> - pmix_dmdx_local_t *cd; >> - >> - /* find corresponding request (if exists) */ >> - if( NULL == lcd ){ >> - PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs, >> pmix_dmdx_local_t) { >> - if (0 != strncmp(nptr->nspace, cd->proc.nspace, >> PMIX_MAX_NSLEN) || >> - rank != cd->proc.rank) { >> - continue; >> - } >> - lcd = cd; >> - break; >> - } >> - } >> - >> - /* If somebody was interested in this rank */ >> - if( NULL != lcd ){ >> - pmix_dmdx_request_t *req; >> - >> - if (PMIX_SUCCESS != status){ >> - /* if we've got an error for this request - just forward it*/ >> - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { >> - /* if we can't satisfy this request - respond with error >> */ >> - req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL); >> - } >> - } else { >> - /* if we've got the blob - try to satisfy requests */ >> - pmix_hash_table_t *ht; >> - pmix_rank_info_t *iptr; >> - >> - /* by default we are looking for the remote data */ >> - ht = &nptr->server->remote; >> - /* check if this rank is local */ >> - PMIX_LIST_FOREACH(iptr, &nptr->server->ranks, >> pmix_rank_info_t) { >> - if (iptr->rank == rank) { >> - ht = &nptr->server->mylocal; >> - break; >> - } >> - } >> - >> - /* run through all the requests to this rank */ >> - PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) { >> - pmix_status_t rc; >> - rc = _satisfy_request(nptr, rank, ht, req->cbfunc, >> req->cbdata); >> - if( PMIX_SUCCESS != rc ){ >> - /* if we can't satisfy this particular request >> (missing key?) */ >> - req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL); >> - } >> - } >> - } >> - /* remove all requests to this rank and cleanup the >> corresponding structure */ >> - pmix_list_remove_item(&pmix_server_globals.local_reqs, >> (pmix_list_item_t*)lcd); >> - PMIX_RELEASE(lcd); >> - } >> - return PMIX_SUCCESS; >> -} >> - >> pmix_status_t pmix_server_abort(pmix_peer_t *peer, pmix_buffer_t *buf, >> pmix_op_cbfunc_t cbfunc, void *cbdata) >> { >> @@ -436,13 +196,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, >> pmix_buffer_t *buf) >> * may not be a contribution */ >> if (PMIX_SUCCESS == pmix_hash_fetch(&nptr->server->myremote, >> info->rank, "modex", &val) && >> NULL != val) { >> - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); >> - PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes, >> val->data.bo.size); >> - pmix_buffer_t *pxfer = &xfer; >> - pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER); >> - xfer.base_ptr = NULL; >> - xfer.bytes_used = 0; >> - PMIX_DESTRUCT(&xfer); >> + PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes, >> val->data.bo.size); >> PMIX_VALUE_RELEASE(val); >> } >> PMIX_UNLOAD_BUFFER(&pbkt, data, sz); >> @@ -457,7 +211,7 @@ pmix_status_t pmix_server_commit(pmix_peer_t *peer, >> pmix_buffer_t *buf) >> PMIX_RELEASE(dcd); >> } >> } >> - /* see if anyone local is waiting on this data- could be more than >> one */ >> + /* see if anyone local is waiting on this data - could be more than >> one */ >> return pmix_pending_resolve(nptr, info->rank, PMIX_SUCCESS, NULL); >> } >> >> @@ -826,163 +580,6 @@ pmix_status_t pmix_server_fence(pmix_server_caddy_t >> *cd, >> return rc; >> } >> >> -static void _process_dmdx_reply(int fd, short args, void *cbdata) >> -{ >> - pmix_dmdx_reply_caddy_t *caddy = (pmix_dmdx_reply_caddy_t *)cbdata; >> - pmix_kval_t *kp; >> - pmix_nspace_t *ns, *nptr; >> - pmix_status_t rc; >> - >> - pmix_output_verbose(2, pmix_globals.debug_output, >> - "[%s:%d] queue dmdx reply from %s:%d", >> - __FILE__, __LINE__, >> - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); >> - >> - /* find the nspace object for this client */ >> - nptr = NULL; >> - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >> - if (0 == strcmp(caddy->lcd->proc.nspace, ns->nspace)) { >> - nptr = ns; >> - break; >> - } >> - } >> - >> - if (NULL == nptr) { >> - /* should be impossible */ >> - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); >> - caddy->status = PMIX_ERR_NOT_FOUND; >> - goto cleanup; >> - } >> - >> - if (PMIX_SUCCESS == caddy->status) { >> - kp = PMIX_NEW(pmix_kval_t); >> - kp->key = strdup("modex"); >> - PMIX_VALUE_CREATE(kp->value, 1); >> - kp->value->type = PMIX_BYTE_OBJECT; >> - /* we don't know if the host is going to save this data >> - * or not, so we have to copy it */ >> - kp->value->data.bo.bytes = (char*)malloc(caddy->ndata); >> - memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata); >> - kp->value->data.bo.size = caddy->ndata; >> - /* store it in the appropriate hash */ >> - if (PMIX_SUCCESS != (rc = pmix_hash_store(&nptr->server->remote, >> caddy->lcd->proc.rank, kp))) { >> - PMIX_ERROR_LOG(rc); >> - } >> - PMIX_RELEASE(kp); // maintain acctg >> - } >> - >> -cleanup: >> - /* always execute the callback to avoid having the client hang */ >> - pmix_pending_resolve(nptr, caddy->lcd->proc.rank, caddy->status, >> caddy->lcd); >> - >> - /* now call the release function so the host server >> - * knows it can release the data */ >> - if (NULL != caddy->relcbfunc) { >> - caddy->relcbfunc(caddy->cbdata); >> - } >> - PMIX_RELEASE(caddy); >> -} >> - >> -static void dmdx_cbfunc(pmix_status_t status, >> - const char *data, size_t ndata, void *cbdata, >> - pmix_release_cbfunc_t release_fn, void >> *release_cbdata) >> -{ >> - pmix_dmdx_reply_caddy_t *caddy; >> - caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t); >> - caddy->status = status; >> - /* point to the callers cbfunc */ >> - caddy->relcbfunc = release_fn; >> - caddy->cbdata = release_cbdata; >> - >> - caddy->data = data; >> - caddy->ndata = ndata; >> - caddy->lcd = (pmix_dmdx_local_t *)cbdata; >> - pmix_output_verbose(2, pmix_globals.debug_output, "[%s:%d] queue >> dmdx reply %s:%d", >> - __FILE__, __LINE__, >> - caddy->lcd->proc.nspace, caddy->lcd->proc.rank); >> - event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE, >> - _process_dmdx_reply, caddy); >> - event_priority_set(&caddy->ev, 0); >> - event_active(&caddy->ev, EV_WRITE, 1); >> -} >> - >> -pmix_status_t pmix_server_get(pmix_buffer_t *buf, >> - pmix_modex_cbfunc_t cbfunc, >> - void *cbdata) >> -{ >> - int32_t cnt; >> - pmix_status_t rc; >> - int rank; >> - char *cptr; >> - char nspace[PMIX_MAX_NSLEN+1]; >> - pmix_nspace_t *ns, *nptr; >> - pmix_info_t *info=NULL; >> - size_t ninfo=0; >> - >> - pmix_output_verbose(2, pmix_globals.debug_output, >> - "recvd GET"); >> - >> - /* setup */ >> - memset(nspace, 0, sizeof(nspace)); >> - >> - /* retrieve the nspace and rank of the requested proc */ >> - cnt = 1; >> - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &cptr, &cnt, >> PMIX_STRING))) { >> - PMIX_ERROR_LOG(rc); >> - return rc; >> - } >> - (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN); >> - free(cptr); >> - cnt = 1; >> - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &rank, &cnt, >> PMIX_INT))) { >> - PMIX_ERROR_LOG(rc); >> - return rc; >> - } >> - /* find the nspace object for this client */ >> - nptr = NULL; >> - PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_nspace_t) { >> - if (0 == strcmp(nspace, ns->nspace)) { >> - nptr = ns; >> - break; >> - } >> - } >> - >> - pmix_output_verbose(2, pmix_globals.debug_output, >> - "%s:%d EXECUTE GET FOR %s:%d", >> - pmix_globals.myid.nspace, >> - pmix_globals.myid.rank, nspace, rank); >> - >> - /* retrieve any provided info structs */ >> - cnt = 1; >> - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, &ninfo, &cnt, >> PMIX_SIZE))) { >> - PMIX_ERROR_LOG(rc); >> - return rc; >> - } >> - if (0 < ninfo) { >> - PMIX_INFO_CREATE(info, ninfo); >> - cnt = ninfo; >> - if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf, info, &cnt, >> PMIX_INFO))) { >> - PMIX_ERROR_LOG(rc); >> - PMIX_INFO_FREE(info, ninfo); >> - return rc; >> - } >> - } >> - >> - if (NULL == nptr) { >> - /* this is for an nspace we don't know about yet, so >> - * give the host server a chance to tell us about it */ >> - nptr = PMIX_NEW(pmix_nspace_t); >> - (void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN); >> - pmix_list_append(&pmix_globals.nspaces, &nptr->super); >> - } >> - /* if we don't have any ranks for this job, protect ourselves here */ >> - if (NULL == nptr->server) { >> - nptr->server = PMIX_NEW(pmix_server_nspace_t); >> - } >> - >> - return pmix_pending_request(nptr, rank, info, ninfo, cbfunc, cbdata); >> -} >> - >> pmix_status_t pmix_server_publish(pmix_peer_t *peer, >> pmix_buffer_t *buf, >> pmix_op_cbfunc_t cbfunc, void *cbdata) >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h >> b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h >> index c6279d5..9129b6b 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h >> @@ -183,10 +183,7 @@ void pmix_stop_listening(void); >> >> bool pmix_server_trk_update(pmix_server_trkr_t *trk); >> >> -pmix_status_t pmix_pending_request(pmix_nspace_t *nptr, int rank, >> - pmix_info_t *info, size_t ninfo, >> - pmix_modex_cbfunc_t cbfunc, void >> *cbdata); >> -void pmix_pending_nspace_fix(pmix_nspace_t *nptr); >> +void pmix_pending_nspace_requests(pmix_nspace_t *nptr); >> pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr, int rank, >> pmix_status_t status, >> pmix_dmdx_local_t *lcd); >> >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c >> b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c >> index 8cc4bcd..90c42ed 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c >> @@ -123,6 +123,8 @@ const char* PMIx_Error_string(pmix_status_t errnum) >> case PMIX_EXISTS: >> return "EXISTS"; >> >> + case PMIX_ERR_SILENT: >> + return "SILENT"; >> case PMIX_ERROR: >> return "ERROR"; >> case PMIX_SUCCESS: >> diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h >> b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h >> index f72227a..e43ac47 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h >> +++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h >> @@ -28,9 +28,13 @@ >> >> BEGIN_C_DECLS >> >> -#define PMIX_ERROR_LOG(r) \ >> - pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ >> - PMIx_Error_string((r)), __FILE__, __LINE__); >> +#define PMIX_ERROR_LOG(r) \ >> + do { \ >> + if (PMIX_ERR_SILENT != (r)) { \ >> + pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \ >> + PMIx_Error_string((r)), __FILE__, __LINE__); \ >> + } \ >> + }while(0); >> >> #define PMIX_REPORT_ERROR(e) \ >> do { \ >> diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c >> b/opal/mca/pmix/pmix1xx/pmix1_client.c >> index f1ba0d5..e9c50b7 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix1_client.c >> +++ b/opal/mca/pmix/pmix1xx/pmix1_client.c >> @@ -217,6 +217,7 @@ int pmix1_store_local(const opal_process_name_t >> *proc, opal_value_t *val) >> } >> } >> if (NULL == job) { >> + OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND); >> return OPAL_ERR_NOT_FOUND; >> } >> (void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN); >> diff --git a/opal/mca/pmix/pmix1xx/pmix1_server_south.c >> b/opal/mca/pmix/pmix1xx/pmix1_server_south.c >> index ae42de0..f0d0f11 100644 >> --- a/opal/mca/pmix/pmix1xx/pmix1_server_south.c >> +++ b/opal/mca/pmix/pmix1xx/pmix1_server_south.c >> @@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t status, void >> *cbdata) >> } >> >> int pmix1_server_register_nspace(opal_jobid_t jobid, >> - int nlocalprocs, >> - opal_list_t *info, >> - opal_pmix_op_cbfunc_t cbfunc, >> - void *cbdata) >> + int nlocalprocs, >> + opal_list_t *info, >> + opal_pmix_op_cbfunc_t cbfunc, >> + void *cbdata) >> { >> opal_value_t *kv, *k2; >> pmix_info_t *pinfo, *pmap; >> @@ -168,10 +168,17 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, >> pmix_status_t rc; >> pmix1_opcaddy_t *op; >> opal_list_t *pmapinfo; >> + opal_pmix1_jobid_trkr_t *job; >> >> /* convert the jobid */ >> (void)snprintf(nspace, PMIX_MAX_NSLEN, >> opal_convert_jobid_to_string(jobid)); >> >> + /* store this job in our list of known nspaces */ >> + job = OBJ_NEW(opal_pmix1_jobid_trkr_t); >> + (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN); >> + job->jobid = jobid; >> + opal_list_append(&mca_pmix_pmix1xx_component.jobids, &job->super); >> + >> /* convert the list to an array of pmix_info_t */ >> if (NULL != info) { >> sz = opal_list_get_size(info); >> @@ -220,10 +227,10 @@ int pmix1_server_register_nspace(opal_jobid_t jobid, >> >> >> int pmix1_server_register_client(const opal_process_name_t *proc, >> - uid_t uid, gid_t gid, >> - void *server_object, >> - opal_pmix_op_cbfunc_t cbfunc, >> - void *cbdata) >> + uid_t uid, gid_t gid, >> + void *server_object, >> + opal_pmix_op_cbfunc_t cbfunc, >> + void *cbdata) >> { >> pmix_status_t rc; >> pmix1_opcaddy_t *op; >> @@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t status, char >> *data, size_t sz, void *cbd >> } >> >> int pmix1_server_dmodex(const opal_process_name_t *proc, >> - opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) >> + opal_pmix_modex_cbfunc_t cbfunc, void *cbdata) >> { >> pmix1_opcaddy_t *op; >> pmix_status_t rc; >> diff --git a/orte/orted/pmix/pmix_server.c b/orte/orted/pmix/pmix_server.c >> index 953145d..ee5582c 100644 >> --- a/orte/orted/pmix/pmix_server.c >> +++ b/orte/orted/pmix/pmix_server.c >> @@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int status, >> orte_process_name_t* sender, >> int rc, ret, room_num; >> int32_t cnt; >> opal_process_name_t target; >> - opal_value_t kv; >> pmix_server_req_t *req; >> uint8_t *data = NULL; >> int32_t ndata = 0; >> @@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int status, >> orte_process_name_t* sender, >> return; >> } >> >> - /* if we got something, store the blobs locally so we can >> - * meet any further requests without doing a remote fetch. >> - * This must be done as a single blob for later retrieval */ >> - if (ORTE_SUCCESS == ret && NULL != data) { >> - OBJ_CONSTRUCT(&kv, opal_value_t); >> - kv.key = strdup("modex"); >> - kv.type = OPAL_BYTE_OBJECT; >> - kv.data.bo.bytes = data; >> - kv.data.bo.size = ndata; >> - if (OPAL_SUCCESS != (rc = opal_pmix.store_local(&target, &kv))) { >> - ORTE_ERROR_LOG(rc); >> - } >> - kv.data.bo.bytes = NULL; // protect the data >> - kv.data.bo.size = 0; >> - OBJ_DESTRUCT(&kv); >> - } >> - >> /* check the request out of the tracking hotel */ >> >> opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs, >> room_num, (void**)&req); >> /* return the returned data to the requestor */ >> if (NULL != req) { >> if (NULL != req->mdxcbfunc) { >> req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata, >> relcbfunc, data); >> + } else { >> + free(data); >> } >> OBJ_RELEASE(req); >> } >> diff --git a/orte/orted/pmix/pmix_server_fence.c >> b/orte/orted/pmix/pmix_server_fence.c >> index b3b0e33..765c1c2 100644 >> --- a/orte/orted/pmix/pmix_server_fence.c >> +++ b/orte/orted/pmix/pmix_server_fence.c >> @@ -197,6 +197,12 @@ static void dmodex_req(int sd, short args, void >> *cbdata) >> goto callback; >> } >> >> + /* if we are the host daemon, then this is a local request, so >> + * just wait for the data to come in */ >> + if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) { >> + return; >> + } >> + >> /* construct a request message */ >> buf = OBJ_NEW(opal_buffer_t); >> if (OPAL_SUCCESS != (rc = opal_dss.pack(buf, &req->target, 1, >> OPAL_NAME))) { >> >> >> ----------------------------------------------------------------------- >> >> Summary of changes: >> ompi/runtime/ompi_mpi_init.c | 7 +- >> opal/mca/pmix/base/pmix_base_frame.c | 11 +- >> opal/mca/pmix/pmix.h | 13 +- >> .../pmix1xx/pmix/include/pmix/pmix_common.h.in | 3 +- >> .../pmix/pmix1xx/pmix/src/client/pmix_client_get.c | 1 + >> opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am | 3 +- >> .../mca/pmix/pmix1xx/pmix/src/server/pmix_server.c | 11 +- >> .../pmix/pmix1xx/pmix/src/server/pmix_server_get.c | 552 >> +++++++++++++++++++++ >> .../pmix/pmix1xx/pmix/src/server/pmix_server_ops.c | 407 +-------------- >> .../pmix/pmix1xx/pmix/src/server/pmix_server_ops.h | 5 +- >> opal/mca/pmix/pmix1xx/pmix/src/util/error.c | 2 + >> opal/mca/pmix/pmix1xx/pmix/src/util/error.h | 10 +- >> opal/mca/pmix/pmix1xx/pmix1_client.c | 1 + >> opal/mca/pmix/pmix1xx/pmix1_server_south.c | 25 +- >> orte/orted/pmix/pmix_server.c | 20 +- >> orte/orted/pmix/pmix_server_fence.c | 6 + >> 16 files changed, 620 insertions(+), 457 deletions(-) >> create mode 100644 >> opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c >> >> >> hooks/post-receive >> -- >> open-mpi/ompi >> _______________________________________________ >> ompi-commits mailing list >> [email protected] >> http://www.open-mpi.org/mailman/listinfo.cgi/ompi-commits >> >> >> >> _______________________________________________ >> devel mailing [email protected] >> Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel >> Link to this post: >> http://www.open-mpi.org/community/lists/devel/2015/10/18297.php >> >> >> >> >> _______________________________________________ >> devel mailing list >> [email protected] >> Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel >> Link to this post: >> http://www.open-mpi.org/community/lists/devel/2015/10/18299.php >> > > > > _______________________________________________ > devel mailing [email protected] > Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel > Link to this post: > http://www.open-mpi.org/community/lists/devel/2015/10/18300.php > > > > _______________________________________________ > devel mailing list > [email protected] > Subscription: http://www.open-mpi.org/mailman/listinfo.cgi/devel > Link to this post: > http://www.open-mpi.org/community/lists/devel/2015/10/18301.php >
