We get a nice compiler complaint:
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:
In function 'pmix_server_get':
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131:
error: 'PMIX_ERR_SILENT' undeclared (first use in this
function)
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131:
error: (Each undeclared identifier is reported only once
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:131:
error: for each function it appears in.)
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:117:
warning: unused variable 'cd'
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:
In function '_process_dmdx_reply':
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:478:
error: 'PMIX_ERR_SILENT' undeclared (first use in this
function)
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460:
warning: unused variable 'xptr'
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460:
warning: unused variable 'pbkt'
../../../../../../ompi/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c:460:
warning: unused variable 'xfer'
And he is right:
$find . -name "*.h" -exec grep -Hn PMIX_ERR_SILENT {} +
./opal/mca/pmix/pmix1xx/pmix/src/util/error.h:33: if
(PMIX_ERR_SILENT != (r)) { \
George.
---------- Forwarded message ----------
From: <git...@crest.iu.edu <mailto:git...@crest.iu.edu>>
Date: Tue, Oct 27, 2015 at 10:39 PM
Subject: [OMPI commits] Git: open-mpi/ompi branch master
updated. dev-2921-gb603307
To: ompi-comm...@open-mpi.org
<mailto:ompi-comm...@open-mpi.org>
This is an automated email from the git hooks/post-receive
script. It was
generated because a ref change was pushed to the repository
containing
the project "open-mpi/ompi".
The branch, master has been updated
via b603307f7d33663ef6fe5941bb0d94bd2be017cb (commit)
via 267ca8fcd3a59b780491d80d29e870061d8dac56 (commit)
from 3035e140511b082c51ad66e116dd381a083a191d (commit)
Those revisions listed above that are new to this
repository have
not appeared on any other notification email; so we list those
revisions in full, below.
- Log
-----------------------------------------------------------------
https://github.com/open-mpi/ompi/commit/b603307f7d33663ef6fe5941bb0d94bd2be017cb
commit b603307f7d33663ef6fe5941bb0d94bd2be017cb
Merge: 3035e14 267ca8f
Author: rhc54 <r...@open-mpi.org <mailto:r...@open-mpi.org>>
Date: Tue Oct 27 19:39:10 2015 -0700
Merge pull request #1073 from rhc54/topic/pmix
Cleanup the PMIx direct modex support.
https://github.com/open-mpi/ompi/commit/267ca8fcd3a59b780491d80d29e870061d8dac56
commit 267ca8fcd3a59b780491d80d29e870061d8dac56
Author: Ralph Castain <r...@open-mpi.org
<mailto:r...@open-mpi.org>>
Date: Tue Oct 27 11:01:49 2015 -0700
Cleanup the PMIx direct modex support. Add an MCA
parameter pmix_base_async_modex that will cause the async
modex to be used when set to 1. Default it to 0 for now
to continue current default behavior.
Also add an MCA param pmix_base_collect_data to direct
that the blocking fence shall return all data to each
process. Obviously, this param has no effect if async_
modex is used.
diff --git a/ompi/runtime/ompi_mpi_init.c
b/ompi/runtime/ompi_mpi_init.c
index 4c0391d..d0eebb2 100644
--- a/ompi/runtime/ompi_mpi_init.c
+++ b/ompi/runtime/ompi_mpi_init.c
@@ -639,10 +639,9 @@ int ompi_mpi_init(int argc, char
**argv, int requested, int *provided)
/* exchange connection info - this function may also
act as a barrier
* if data exchange is required. The modex occurs
solely across procs
- * in our job, so no proc array is passed. If a
barrier is required,
- * the "modex" function will perform it internally
- */
- OPAL_MODEX(NULL, 1);
+ * in our job. If a barrier is required, the "modex"
function will
+ * perform it internally */
+ OPAL_MODEX();
OPAL_TIMING_MNEXT((&tm,"time from modex to first barrier"));
diff --git a/opal/mca/pmix/base/pmix_base_frame.c
b/opal/mca/pmix/base/pmix_base_frame.c
index e1ab766..6e8a347 100644
--- a/opal/mca/pmix/base/pmix_base_frame.c
+++ b/opal/mca/pmix/base/pmix_base_frame.c
@@ -31,12 +31,21 @@
/* Note that this initializer is important -- do not
remove it! See
https://github.com/open-mpi/ompi/issues/375 for details. */
opal_pmix_base_module_t opal_pmix = { 0 };
-bool opal_pmix_collect_all_data = false;
+bool opal_pmix_collect_all_data = true;
bool opal_pmix_base_allow_delayed_server = false;
int opal_pmix_verbose_output = -1;
+bool opal_pmix_base_async_modex = false;
static int
opal_pmix_base_frame_register(mca_base_register_flag_t flags)
{
+ opal_pmix_base_async_modex = false;
+ (void) mca_base_var_register("opal", "pmix", "base",
"async_modex", "Use asynchronous modex mode",
+ MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
+ MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_base_async_modex);
+ opal_pmix_collect_all_data = true;
+ (void) mca_base_var_register("opal", "pmix", "base",
"collect_data", "Collect all data during modex",
+ MCA_BASE_VAR_TYPE_BOOL, NULL, 0, 0, OPAL_INFO_LVL_9,
+ MCA_BASE_VAR_SCOPE_READONLY, &opal_pmix_collect_all_data);
return OPAL_SUCCESS;
}
diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h
index f265e01..7223529 100644
--- a/opal/mca/pmix/pmix.h
+++ b/opal/mca/pmix/pmix.h
@@ -36,6 +36,8 @@ BEGIN_C_DECLS
/* provide access to the framework verbose output without
* exposing the entire base */
extern int opal_pmix_verbose_output;
+extern bool opal_pmix_collect_all_data;
+extern bool opal_pmix_base_async_modex;
extern int opal_pmix_base_exchange(opal_value_t *info,
opal_pmix_pdata_t *pdat,
int timeout);
@@ -254,10 +256,13 @@ extern int
opal_pmix_base_exchange(opal_value_t *info,
* that takes into account directives and availability of
* non-blocking operations
*/
-#define OPAL_MODEX(p, s) \
- do { \
- opal_pmix.commit(); \
- opal_pmix.fence((p), (s)); \
+#define OPAL_MODEX() \
+ do { \
+ opal_pmix.commit(); \
+ if (!opal_pmix_base_async_modex) { \
+ opal_pmix.fence(NULL, \
+ opal_pmix_collect_all_data); \
+ } \
} while(0);
/**
diff --git
a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in
<http://pmix_common.h.in>
b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in
<http://pmix_common.h.in>
index 0216e34..5a111a1 100644
---
a/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in
<http://pmix_common.h.in>
+++
b/opal/mca/pmix/pmix1xx/pmix/include/pmix/pmix_common.h.in
<http://pmix_common.h.in>
@@ -183,7 +183,7 @@ BEGIN_C_DECLS
/**** PMIX ERROR CONSTANTS ****/
/* PMIx errors are always negative, with 0 reserved for
success */
-#define PMIX_ERROR_MIN -41 // set equal to number of
non-zero entries in enum
+#define PMIX_ERROR_MIN -42 // set equal to number of
non-zero entries in enum
typedef enum {
PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER = PMIX_ERROR_MIN,
@@ -230,6 +230,7 @@ typedef enum {
PMIX_ERR_INVALID_CRED,
PMIX_EXISTS,
+ PMIX_ERR_SILENT,
PMIX_ERROR,
PMIX_SUCCESS
} pmix_status_t;
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c
b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c
index d41be9c..b93ca6d 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c
+++ b/opal/mca/pmix/pmix1xx/pmix/src/client/pmix_client_get.c
@@ -458,6 +458,7 @@ static void getnb_cbfunc(struct
pmix_peer_t *pr, pmix_usock_hdr_t *hdr,
PMIX_RELEASE(bptr); // free's the data region
if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
PMIX_ERROR_LOG(rc);
+ rc = PMIX_ERR_SILENT; // avoid error-logging twice
break;
}
}
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am
b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am
index 5422b78..88b0468 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am
+++ b/opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am
@@ -16,4 +16,5 @@ sources += \
src/server/pmix_server.c \
src/server/pmix_server_ops.c \
src/server/pmix_server_regex.c \
- src/server/pmix_server_listener.c
+ src/server/pmix_server_listener.c \
+ src/server/pmix_server_get.c
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c
b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c
index d16ae16..85f9e17 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c
+++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server.c
@@ -141,8 +141,7 @@ static void _queue_message(int fd,
short args, void *cbdata)
pmix_usock_queue_t *queue = (pmix_usock_queue_t*)cbdata;
pmix_usock_send_t *snd;
pmix_output_verbose(2, pmix_globals.debug_output,
- "[%s:%d] queue callback called: reply to %s:%d on tag %d,"
- "event_is_active=%d",
+ "[%s:%d] queue callback called: reply to %s:%d on tag %d",
__FILE__, __LINE__,
(queue->peer)->info->nptr->nspace,
(queue->peer)->info->rank, (queue->tag),
@@ -179,12 +178,10 @@ static void _queue_message(int fd,
short args, void *cbdata)
queue->buf = (b); \
queue->tag = (t); \
pmix_output_verbose(2, pmix_globals.debug_output,
\
- "[%s:%d] queue reply to %s:%d on tag %d," \
- "event_is_active=%d", \
+ "[%s:%d] queue reply to %s:%d on tag %d", \
__FILE__, __LINE__, \
(queue->peer)->info->nptr->nspace, \
- (queue->peer)->info->rank, (queue->tag), \
- (queue->peer)->send_ev_active); \
+ (queue->peer)->info->rank, (queue->tag)); \
event_assign(&queue->ev, pmix_globals.evbase, -1, \
EV_WRITE, _queue_message, queue); \
event_priority_set(&queue->ev, 0); \
@@ -723,7 +720,7 @@ static void _register_client(int sd,
short args, void *cbdata)
* someone has been waiting for a request on a
remote proc
* in one of our nspaces, but we didn't know all
the local procs
* and so couldn't determine the proc was remote */
- pmix_pending_nspace_fix(nptr);
+ pmix_pending_nspace_requests(nptr);
}
/* let the caller know we are done */
if (NULL != cd->opcbfunc) {
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
new file mode 100644
index 0000000..2cb75cf
--- /dev/null
+++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
@@ -0,0 +1,552 @@
+/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
+ * Copyright (c) 2014-2015 Research Organization for
Information Science
+ * and Technology (RIST). All
rights reserved.
+ * Copyright (c) 2014-2015 Artem Y. Polyakov
<artpo...@gmail.com <mailto:artpo...@gmail.com>>.
+ * All rights reserved.
+ * Copyright (c) 2015 Mellanox Technologies, Inc.
+ * All rights reserved.
+ * $COPYRIGHT$
+ *
+ * Additional copyrights may follow
+ *
+ * $HEADER$
+ */
+
+#include <private/autogen/config.h>
+#include <pmix/rename.h>
+#include <private/types.h>
+#include <private/pmix_stdint.h>
+#include <private/pmix_socket_errno.h>
+
+#include <pmix_server.h>
+#include "src/include/pmix_globals.h"
+
+#ifdef HAVE_STRING_H
+#include <string.h>
+#endif
+#include <fcntl.h>
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+#ifdef HAVE_SYS_UN_H
+#include <sys/un.h>
+#endif
+#ifdef HAVE_SYS_UIO_H
+#include <sys/uio.h>
+#endif
+#ifdef HAVE_SYS_TYPES_H
+#include <sys/types.h>
+#endif
+#include PMIX_EVENT_HEADER
+
+#include "src/class/pmix_list.h"
+#include "src/buffer_ops/buffer_ops.h"
+#include "src/util/argv.h"
+#include "src/util/error.h"
+#include "src/util/output.h"
+#include "src/util/pmix_environ.h"
+#include "src/util/progress_threads.h"
+#include "src/usock/usock.h"
+#include "src/sec/pmix_sec.h"
+
+#include "pmix_server_ops.h"
+
+extern pmix_server_module_t pmix_host_server;
+
+typedef struct {
+ pmix_object_t super;
+ pmix_event_t ev;
+ pmix_status_t status;
+ const char *data;
+ size_t ndata;
+ pmix_dmdx_local_t *lcd;
+ pmix_release_cbfunc_t relcbfunc;
+ void *cbdata;
+} pmix_dmdx_reply_caddy_t;
+static void dcd_con(pmix_dmdx_reply_caddy_t *p)
+{
+ p->status = PMIX_ERROR;
+ p->ndata = 0;
+ p->lcd = NULL;
+ p->relcbfunc = NULL;
+ p->cbdata = NULL;
+}
+PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
+ pmix_object_t, dcd_con, NULL);
+
+
+static void dmdx_cbfunc(pmix_status_t status, const char
*data,
+ size_t ndata, void *cbdata,
+ pmix_release_cbfunc_t relfn, void *relcbdata);
+static pmix_status_t _satisfy_request(pmix_hash_table_t
*ht, int rank,
+ pmix_modex_cbfunc_t cbfunc, void *cbdata);
+static pmix_status_t create_local_tracker(char nspace[],
int rank,
+ pmix_info_t info[], size_t ninfo,
+ pmix_modex_cbfunc_t cbfunc,
+ void *cbdata,
+ pmix_dmdx_local_t **lcd);
+
+
+/* declare a function whose sole purpose is to
+ * free data that we provided to our host server
+ * when servicing dmodex requests */
+static void relfn(void *cbdata)
+{
+ char *data = (char*)cbdata;
+ free(data);
+}
+
+
+pmix_status_t pmix_server_get(pmix_buffer_t *buf,
+ pmix_modex_cbfunc_t cbfunc,
+ void *cbdata)
+{
+ int32_t cnt;
+ pmix_status_t rc;
+ int rank;
+ char *cptr;
+ char nspace[PMIX_MAX_NSLEN+1];
+ pmix_nspace_t *ns, *nptr;
+ pmix_info_t *info=NULL;
+ size_t ninfo=0;
+ pmix_dmdx_local_t *lcd, *cd;
+ pmix_rank_info_t *iptr;
+ pmix_hash_table_t *ht;
+ bool local;
+
+ pmix_output_verbose(2, pmix_globals.debug_output,
+ "recvd GET");
+
+ /* setup */
+ memset(nspace, 0, sizeof(nspace));
+
+ /* retrieve the nspace and rank of the requested proc */
+ cnt = 1;
+ if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&cptr, &cnt, PMIX_STRING))) {
+ PMIX_ERROR_LOG(rc);
+ return rc;
+ }
+ (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
+ free(cptr);
+ cnt = 1;
+ if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&rank, &cnt, PMIX_INT))) {
+ PMIX_ERROR_LOG(rc);
+ return rc;
+ }
+ /* retrieve any provided info structs */
+ cnt = 1;
+ if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&ninfo, &cnt, PMIX_SIZE))) {
+ PMIX_ERROR_LOG(rc);
+ return rc;
+ }
+ if (0 < ninfo) {
+ PMIX_INFO_CREATE(info, ninfo);
+ cnt = ninfo;
+ if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
info, &cnt, PMIX_INFO))) {
+ PMIX_ERROR_LOG(rc);
+ PMIX_INFO_FREE(info, ninfo);
+ return rc;
+ }
+ }
+
+ /* find the nspace object for this client */
+ nptr = NULL;
+ PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces,
pmix_nspace_t) {
+ if (0 == strcmp(nspace, ns->nspace)) {
+ nptr = ns;
+ break;
+ }
+ }
+
+ pmix_output_verbose(2, pmix_globals.debug_output,
+ "%s:%d EXECUTE GET FOR %s:%d",
+ pmix_globals.myid.nspace,
+ pmix_globals.myid.rank, nspace, rank);
+
+ if (NULL == nptr || NULL == nptr->server) {
+ /* this is for an nspace we don't know about yet, so
+ * record the request for data from this process and
+ * give the host server a chance to tell us about
it */
+ rc = create_local_tracker(nspace, rank, info, ninfo,
+ cbfunc, cbdata, &lcd);
+ return rc;
+ }
+
+ /* We have to wait for all local clients to be
registered before
+ * we can know whether this request is for data from a
local or a
+ * remote client because one client might ask for data
about another
+ * client that the host RM hasn't told us about yet.
Fortunately,
+ * we do know how many clients to expect, so first
check to see if
+ * all clients have been registered with us */
+ if (!nptr->server->all_registered) {
+ /* we cannot do anything further, so just track
this request
+ * for now */
+ rc = create_local_tracker(nspace, rank, info, ninfo,
+ cbfunc, cbdata, &lcd);
+ return rc;
+ }
+
+ /* Since we know about all the local clients in this
nspace,
+ * let's first try to satisfy the request with any
available data.
+ * By default, we assume we are looking for data from
a remote
+ * client, and then check to see if this is one of my
local
+ * clients - if so, then we look in that hash table */
+ ht = &nptr->server->remote;
+ local = false;
+ PMIX_LIST_FOREACH(iptr, &nptr->server->ranks,
pmix_rank_info_t) {
+ if (iptr->rank == rank) {
+ /* it is known local client - check the local
table */
+ ht = &nptr->server->mylocal;
+ local = true;
+ break;
+ }
+ }
+
+ /* see if we already have this data */
+ rc = _satisfy_request(ht, rank, cbfunc, cbdata);
+ if( PMIX_SUCCESS == rc ){
+ /* request was successfully satisfied */
+ PMIX_INFO_FREE(info, ninfo);
+ return rc;
+ }
+
+ /* If we get here, then we don't have the data at this
time. Check
+ * to see if we already have a pending request for the
data - if
+ * we do, then we can just wait for it to arrive */
+ rc = create_local_tracker(nspace, rank, info, ninfo,
+ cbfunc, cbdata, &lcd);
+ if (PMIX_SUCCESS == rc) {
+ /* we are already waiting for the data - nothing more
+ * for us to do as the function added the new request
+ * to the tracker for us */
+ return PMIX_SUCCESS;
+ }
+ if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) {
+ /* we have a problem - e.g., out of memory */
+ return rc;
+ }
+
+ /* Getting here means that we didn't already have a
request for
+ * for data pending, and so we created a new tracker
for this
+ * request. We know the identity of all our local
clients, so
+ * if this is one, then we have nothing further to do
- we will
+ * fulfill the request once the process commits its
data */
+ if (local) {
+ return PMIX_SUCCESS;
+ }
+
+ /* this isn't a local client of ours, so we need to
ask the host
+ * resource manager server to please get the info for
us from
+ * whomever is hosting the target process */
+ if (NULL != pmix_host_server.direct_modex) {
+ rc = pmix_host_server.direct_modex(&lcd->proc,
info, ninfo, dmdx_cbfunc, lcd);
+ } else {
+ /* if we don't have direct modex feature, just
respond with "not found" */
+ cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
+ PMIX_INFO_FREE(info, ninfo);
+ pmix_list_remove_item(&pmix_server_globals.local_reqs,
&lcd->super);
+ PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
+ PMIX_RELEASE(lcd);
+ rc = PMIX_ERR_NOT_FOUND;
+ }
+
+ return rc;
+}
+
+static pmix_status_t create_local_tracker(char nspace[],
int rank,
+ pmix_info_t info[], size_t ninfo,
+ pmix_modex_cbfunc_t cbfunc,
+ void *cbdata,
+ pmix_dmdx_local_t **ld)
+{
+ pmix_dmdx_local_t *lcd, *cd;
+ pmix_dmdx_request_t *req;
+ pmix_status_t rc;
+
+ /* define default */
+ *ld = NULL;
+
+ /* see if we already have an existing request for data
+ * from this namespace/rank */
+ lcd = NULL;
+ PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs,
pmix_dmdx_local_t) {
+ if (0 != strncmp(nspace, cd->proc.nspace,
PMIX_MAX_NSLEN) ||
+ rank != cd->proc.rank ) {
+ continue;
+ }
+ lcd = cd;
+ break;
+ }
+ if (NULL != lcd) {
+ /* we already have a request, so just track that
someone
+ * else wants data from the same target */
+ rc = PMIX_SUCCESS; // indicates we found an
existing request
+ goto complete;
+ }
+ /* we do not have an existing request, so let's create
+ * one and add it to our list */
+ lcd = PMIX_NEW(pmix_dmdx_local_t);
+ if (NULL == lcd){
+ PMIX_INFO_FREE(info, ninfo);
+ return PMIX_ERR_NOMEM;
+ }
+ strncpy(lcd->proc.nspace, nspace, PMIX_MAX_NSLEN);
+ lcd->proc.rank = rank;
+ lcd->info = info;
+ lcd->ninfo = ninfo;
+ pmix_list_append(&pmix_server_globals.local_reqs,
&lcd->super);
+ rc = PMIX_ERR_NOT_FOUND; // indicates that we created
a new request tracker
+
+ complete:
+ /* track this specific requestor so we return the
+ * data to them */
+ req = PMIX_NEW(pmix_dmdx_request_t);
+ req->cbfunc = cbfunc;
+ req->cbdata = cbdata;
+ pmix_list_append(&lcd->loc_reqs, &req->super);
+ *ld = lcd;
+ return rc;
+}
+
+void pmix_pending_nspace_requests(pmix_nspace_t *nptr)
+{
+ pmix_dmdx_local_t *cd, *cd_next;
+
+ /* Now that we know all local ranks, go along request
list and ask for remote data
+ * for the non-local ranks, and resolve all pending
requests for local procs
+ * that were waiting for registration to complete
+ */
+ PMIX_LIST_FOREACH_SAFE(cd, cd_next,
&pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
+ pmix_rank_info_t *info;
+ bool found = false;
+
+ if (0 != strncmp(nptr->nspace, cd->proc.nspace,
PMIX_MAX_NSLEN) ) {
+ continue;
+ }
+
+ PMIX_LIST_FOREACH(info, &nptr->server->ranks,
pmix_rank_info_t) {
+ if (info->rank == cd->proc.rank) {
+ found = true; // we will satisy this
request upon commit from new proc
+ break;
+ }
+ }
+
+ /* if not found - this is remote process and we
need to send
+ * corresponding direct modex request */
+ if( !found ){
+ if( NULL != pmix_host_server.direct_modex ){
+ pmix_host_server.direct_modex(&cd->proc, cd->info,
cd->ninfo, dmdx_cbfunc, cd);
+ } else {
+ pmix_dmdx_request_t *req, *req_next;
+ PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs,
pmix_dmdx_request_t) {
+ req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata,
NULL, NULL);
+ pmix_list_remove_item(&cd->loc_reqs, &req->super);
+ PMIX_RELEASE(req);
+ }
+ pmix_list_remove_item(&pmix_server_globals.local_reqs,
&cd->super);
+ PMIX_RELEASE(cd);
+ }
+ }
+ }
+}
+
+static pmix_status_t _satisfy_request(pmix_hash_table_t
*ht, int rank,
+ pmix_modex_cbfunc_t cbfunc, void *cbdata)
+{
+ pmix_status_t rc;
+ pmix_value_t *val;
+ char *data;
+ size_t sz;
+ pmix_buffer_t xfer, pbkt, *xptr;
+
+ /* check to see if this data already has been
+ * obtained as a result of a prior direct modex
request from
+ * a remote peer, or due to data from a local client
+ * having been committed */
+ rc = pmix_hash_fetch(ht, rank, "modex", &val);
+ if (PMIX_SUCCESS == rc && NULL != val) {
+ /* the client is expecting this to arrive as a
byte object
+ * containing a buffer, so package it accordingly */
+ PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
+ PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
+ xptr = &xfer;
+ PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes,
val->data.bo.size);
+ pmix_bfrop.pack(&pbkt, &xptr, 1, PMIX_BUFFER);
+ xfer.base_ptr = NULL; // protect the passed data
+ xfer.bytes_used = 0;
+ PMIX_DESTRUCT(&xfer);
+ PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
+ PMIX_DESTRUCT(&pbkt);
+ PMIX_VALUE_RELEASE(val);
+ /* pass it back */
+ cbfunc(rc, data, sz, cbdata, relfn, data);
+ return rc;
+ }
+ return PMIX_ERR_NOT_FOUND;
+}
+
+/* Resolve pending requests to this namespace/rank */
+pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr,
int rank,
+ pmix_status_t status, pmix_dmdx_local_t *lcd)
+{
+ pmix_dmdx_local_t *cd;
+
+ /* find corresponding request (if exists) */
+ if (NULL == lcd && NULL != nptr) {
+ PMIX_LIST_FOREACH(cd,
&pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
+ if (0 != strncmp(nptr->nspace,
cd->proc.nspace, PMIX_MAX_NSLEN) ||
+ rank != cd->proc.rank) {
+ continue;
+ }
+ lcd = cd;
+ break;
+ }
+ }
+
+ /* If somebody was interested in this rank */
+ if (NULL != lcd) {
+ pmix_dmdx_request_t *req;
+
+ if (PMIX_SUCCESS != status){
+ /* if we've got an error for this request -
just forward it*/
+ PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
+ /* if we can't satisfy this request -
respond with error */
+ req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
+ }
+ } else if (NULL != nptr) {
+ /* if we've got the blob - try to satisfy
requests */
+ pmix_hash_table_t *ht;
+ pmix_rank_info_t *iptr;
+
+ /* by default we are looking for the remote
data */
+ ht = &nptr->server->remote;
+ /* check if this rank is local */
+ PMIX_LIST_FOREACH(iptr, &nptr->server->ranks,
pmix_rank_info_t) {
+ if (iptr->rank == rank) {
+ ht = &nptr->server->mylocal;
+ break;
+ }
+ }
+
+ /* run through all the requests to this rank */
+ PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
+ pmix_status_t rc;
+ rc = _satisfy_request(ht, rank,
req->cbfunc, req->cbdata);
+ if( PMIX_SUCCESS != rc ){
+ /* if we can't satisfy this particular
request (missing key?) */
+ req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
+ }
+ }
+ }
+ /* remove all requests to this rank and cleanup
the corresponding structure */
+ pmix_list_remove_item(&pmix_server_globals.local_reqs,
(pmix_list_item_t*)lcd);
+ PMIX_RELEASE(lcd);
+ }
+ return PMIX_SUCCESS;
+}
+
+/* process the returned data from the host RM server */
+static void _process_dmdx_reply(int fd, short args, void
*cbdata)
+{
+ pmix_dmdx_reply_caddy_t *caddy =
(pmix_dmdx_reply_caddy_t *)cbdata;
+ pmix_kval_t *kp;
+ pmix_nspace_t *ns, *nptr;
+ pmix_status_t rc;
+ pmix_buffer_t xfer, pbkt, *xptr;
+
+ pmix_output_verbose(2, pmix_globals.debug_output,
+ "[%s:%d] process dmdx reply from %s:%d",
+ __FILE__, __LINE__,
+ caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
+
+ /* find the nspace object for this client */
+ nptr = NULL;
+ PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces,
pmix_nspace_t) {
+ if (0 == strcmp(caddy->lcd->proc.nspace,
ns->nspace)) {
+ nptr = ns;
+ break;
+ }
+ }
+
+ if (NULL == nptr) {
+ /* should be impossible */
+ PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
+ caddy->status = PMIX_ERR_NOT_FOUND;
+ goto cleanup;
+ }
+
+ /* if the request was successfully satisfied, then
store the data
+ * in our hash table for remote procs. Although we
could immediately
+ * resolve any outstanding requests on our tracking
list, we instead
+ * store the data first so we can immediately satisfy
any future
+ * requests. Then, rather than duplicate the resolve
code here, we
+ * will let the pmix_pending_resolve function go ahead
and retrieve
+ * it from the hash table */
+ if (PMIX_SUCCESS == caddy->status) {
+ kp = PMIX_NEW(pmix_kval_t);
+ kp->key = strdup("modex");
+ PMIX_VALUE_CREATE(kp->value, 1);
+ kp->value->type = PMIX_BYTE_OBJECT;
+ /* we don't know if the host is going to save this
data
+ * or not, so we have to copy it - the client is
expecting
+ * this to arrive as a byte object containing a
buffer, so
+ * package it accordingly */
+ kp->value->data.bo.bytes = malloc(caddy->ndata);
+ memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
+ kp->value->data.bo.size = caddy->ndata;
+ /* store it in the appropriate hash */
+ if (PMIX_SUCCESS != (rc =
pmix_hash_store(&nptr->server->remote,
caddy->lcd->proc.rank, kp))) {
+ PMIX_ERROR_LOG(rc);
+ }
+ PMIX_RELEASE(kp); // maintain acctg
+ }
+
+ /* always execute the callback to avoid having the
client hang */
+ pmix_pending_resolve(nptr, caddy->lcd->proc.rank,
caddy->status, caddy->lcd);
+
+cleanup:
+ /* now call the release function so the host server
+ * knows it can release the data */
+ if (NULL != caddy->relcbfunc) {
+ caddy->relcbfunc(caddy->cbdata);
+ }
+ PMIX_RELEASE(caddy);
+}
+
+/* this is the callback function that the host RM server
will call
+ * when it gets requested info back from a remote server */
+static void dmdx_cbfunc(pmix_status_t status,
+ const char *data, size_t ndata,
void *cbdata,
+ pmix_release_cbfunc_t release_fn, void *release_cbdata)
+{
+ pmix_dmdx_reply_caddy_t *caddy;
+
+ /* because the host RM is calling us from their own
thread, we
+ * need to thread-shift into our local progress thread
before
+ * accessing any global info */
+ caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
+ caddy->status = status;
+ /* point to the callers cbfunc */
+ caddy->relcbfunc = release_fn;
+ caddy->cbdata = release_cbdata;
+
+ /* point to the returned data and our own internal
+ * tracker */
+ caddy->data = data;
+ caddy->ndata = ndata;
+ caddy->lcd = (pmix_dmdx_local_t *)cbdata;
+ pmix_output_verbose(2, pmix_globals.debug_output,
+ "[%s:%d] queue dmdx reply for %s:%d",
+ __FILE__, __LINE__,
+ caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
+ event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
+ _process_dmdx_reply, caddy);
+ event_priority_set(&caddy->ev, 0);
+ event_active(&caddy->ev, EV_WRITE, 1);
+}
+
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c
b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c
index 4a4abd1..43d35b5 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c
+++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.c
@@ -58,246 +58,6 @@
pmix_server_module_t pmix_host_server = {0};
-static void dmdx_cbfunc(pmix_status_t status, const char
*data,
- size_t ndata, void *cbdata,
- pmix_release_cbfunc_t relfn, void *relcbdata);
-
-typedef struct {
- pmix_object_t super;
- pmix_event_t ev;
- pmix_status_t status;
- const char *data;
- size_t ndata;
- pmix_dmdx_local_t *lcd;
- pmix_release_cbfunc_t relcbfunc;
- void *cbdata;
-} pmix_dmdx_reply_caddy_t;
-PMIX_CLASS_INSTANCE(pmix_dmdx_reply_caddy_t,
- pmix_object_t, NULL, NULL);
-
-
-static void relfn(void *cbdata)
-{
- char *data = (char*)cbdata;
- free(data);
-}
-
-static pmix_status_t _satisfy_request(pmix_nspace_t *nptr,
int rank,
- pmix_hash_table_t *ht,
- pmix_modex_cbfunc_t cbfunc, void *cbdata)
-{
- pmix_status_t rc;
- pmix_buffer_t pbkt, xfer;
- pmix_value_t *val;
- char *data;
- size_t sz;
-
- /* check to see if this data already has been
- * obtained as a result of a prior direct modex
request from
- * another local peer */
- rc = pmix_hash_fetch(ht, rank, "modex", &val);
- if (PMIX_SUCCESS == rc && NULL != val) {
- PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
- PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
- pmix_buffer_t *pxfer = &xfer;
- PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes,
val->data.bo.size);
- pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
- xfer.base_ptr = NULL;
- xfer.bytes_used = 0;
- PMIX_DESTRUCT(&xfer);
- PMIX_VALUE_RELEASE(val);
- PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
- PMIX_DESTRUCT(&pbkt);
- /* pass it back */
- cbfunc(rc, data, sz, cbdata, relfn, data);
- return rc;
- }
- return PMIX_ERR_NOT_FOUND;
-}
-
-pmix_status_t pmix_pending_request(pmix_nspace_t *nptr,
int rank,
- pmix_info_t *info, size_t ninfo,
- pmix_modex_cbfunc_t cbfunc, void *cbdata)
-{
- pmix_dmdx_local_t *lcd = NULL, *cd;
- pmix_rank_info_t *iptr;
- pmix_hash_table_t *ht;
- pmix_status_t rc;
-
- /* 1. Try to satisfy the request right now */
-
- /* by default we are looking for the remote data */
- ht = &nptr->server->remote;
- PMIX_LIST_FOREACH(iptr, &nptr->server->ranks,
pmix_rank_info_t) {
- if (iptr->rank == rank) {
- /* in case it is known local rank - check
local table */
- ht = &nptr->server->mylocal;
- break;
- }
- }
-
- rc = _satisfy_request(nptr, rank, ht, cbfunc, cbdata);
- if( PMIX_SUCCESS == rc ){
- /* request was successfully satisfied */
- PMIX_INFO_FREE(info, ninfo);
- return rc;
- }
-
- /* 2. We were unable to satisfy request right now.
- * Look for existing requests to this namespace/rank */
- PMIX_LIST_FOREACH(cd, &pmix_server_globals.local_reqs,
pmix_dmdx_local_t) {
- if (0 != strncmp(nptr->nspace, cd->proc.nspace,
PMIX_MAX_NSLEN) ||
- rank != cd->proc.rank ) {
- continue;
- }
- lcd = cd;
- break;
- }
-
- /* 3. If no requests exists then:
- * - if all local clients are registered then we were
called because
- * the remote data was requested. Create request and
call direct modex
- * to retrieve the data
- * - if not all local ranks were registered, we need
to wait untill
- * pmix_pending_localy_fin would be called to resolve
this. Just add the
- * request for now.
- */
- if (NULL == lcd) {
- lcd = PMIX_NEW(pmix_dmdx_local_t);
- if (NULL == lcd){
- PMIX_INFO_FREE(info, ninfo);
- return PMIX_ERR_NOMEM;
- }
- strncpy(lcd->proc.nspace, nptr->nspace, PMIX_MAX_NSLEN);
- lcd->proc.rank = rank;
- lcd->info = info;
- lcd->ninfo = ninfo;
- pmix_list_append(&pmix_server_globals.local_reqs,
&lcd->super);
-
- /* check & send request if need/possible */
- if (nptr->server->all_registered && NULL == info) {
- if (NULL != pmix_host_server.direct_modex) {
- pmix_host_server.direct_modex(&lcd->proc, info, ninfo,
dmdx_cbfunc, lcd);
- } else {
- /* if we don't have direct modex feature,
just respond with "not found" */
- cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL);
- PMIX_INFO_FREE(info, ninfo);
- pmix_list_remove_item(&pmix_server_globals.local_reqs,
&lcd->super);
- PMIX_LIST_DESTRUCT(&lcd->loc_reqs);
- PMIX_RELEASE(lcd);
- return PMIX_SUCCESS;
- }
- }
- }
- pmix_dmdx_request_t *req = PMIX_NEW(pmix_dmdx_request_t);
- req->cbfunc = cbfunc;
- req->cbdata = cbdata;
- pmix_list_append(&lcd->loc_reqs, &req->super);
- return PMIX_SUCCESS;
-}
-
-void pmix_pending_nspace_fix(pmix_nspace_t *nptr)
-{
- pmix_dmdx_local_t *cd, *cd_next;
-
- /* Now when we know all local ranks, go along request
list and ask for remote data
- * for the non-local ranks, and resolve all pending
requests for local procs
- * that were waiting for registration to complete
- */
- PMIX_LIST_FOREACH_SAFE(cd, cd_next,
&pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
- pmix_rank_info_t *info;
- bool found = false;
-
- if (0 != strncmp(nptr->nspace, cd->proc.nspace,
PMIX_MAX_NSLEN) ) {
- continue;
- }
-
- PMIX_LIST_FOREACH(info, &nptr->server->ranks,
pmix_rank_info_t) {
- if (info->rank == cd->proc.rank) {
- found = true;
- break;
- }
- }
-
- /* if not found - this is remote process and we
need to send
- * corresponding direct modex request */
- if( !found ){
- if( NULL != pmix_host_server.direct_modex ){
- pmix_host_server.direct_modex(&cd->proc, cd->info,
cd->ninfo, dmdx_cbfunc, cd);
- } else {
- pmix_dmdx_request_t *req, *req_next;
- PMIX_LIST_FOREACH_SAFE(req, req_next, &cd->loc_reqs,
pmix_dmdx_request_t) {
- req->cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, req->cbdata,
NULL, NULL);
- pmix_list_remove_item(&cd->loc_reqs, &req->super);
- PMIX_RELEASE(req);
- }
- pmix_list_remove_item(&pmix_server_globals.local_reqs,
&cd->super);
- PMIX_RELEASE(cd);
- }
- }
- }
-}
-
-/* Resolve pending requests to this namespace/rank */
-pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr,
int rank,
- pmix_status_t status, pmix_dmdx_local_t *lcd)
-{
- pmix_dmdx_local_t *cd;
-
- /* find corresponding request (if exists) */
- if( NULL == lcd ){
- PMIX_LIST_FOREACH(cd,
&pmix_server_globals.local_reqs, pmix_dmdx_local_t) {
- if (0 != strncmp(nptr->nspace,
cd->proc.nspace, PMIX_MAX_NSLEN) ||
- rank != cd->proc.rank) {
- continue;
- }
- lcd = cd;
- break;
- }
- }
-
- /* If somebody was interested in this rank */
- if( NULL != lcd ){
- pmix_dmdx_request_t *req;
-
- if (PMIX_SUCCESS != status){
- /* if we've got an error for this request -
just forward it*/
- PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
- /* if we can't satisfy this request -
respond with error */
- req->cbfunc(status, NULL, 0, req->cbdata, NULL, NULL);
- }
- } else {
- /* if we've got the blob - try to satisfy
requests */
- pmix_hash_table_t *ht;
- pmix_rank_info_t *iptr;
-
- /* by default we are looking for the remote
data */
- ht = &nptr->server->remote;
- /* check if this rank is local */
- PMIX_LIST_FOREACH(iptr, &nptr->server->ranks,
pmix_rank_info_t) {
- if (iptr->rank == rank) {
- ht = &nptr->server->mylocal;
- break;
- }
- }
-
- /* run through all the requests to this rank */
- PMIX_LIST_FOREACH(req, &lcd->loc_reqs, pmix_dmdx_request_t) {
- pmix_status_t rc;
- rc = _satisfy_request(nptr, rank, ht,
req->cbfunc, req->cbdata);
- if( PMIX_SUCCESS != rc ){
- /* if we can't satisfy this particular
request (missing key?) */
- req->cbfunc(rc, NULL, 0, req->cbdata, NULL, NULL);
- }
- }
- }
- /* remove all requests to this rank and cleanup
the corresponding structure */
- pmix_list_remove_item(&pmix_server_globals.local_reqs,
(pmix_list_item_t*)lcd);
- PMIX_RELEASE(lcd);
- }
- return PMIX_SUCCESS;
-}
-
pmix_status_t pmix_server_abort(pmix_peer_t *peer,
pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc, void *cbdata)
{
@@ -436,13 +196,7 @@ pmix_status_t
pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
* may not be a contribution */
if (PMIX_SUCCESS ==
pmix_hash_fetch(&nptr->server->myremote, info->rank,
"modex", &val) &&
NULL != val) {
- PMIX_CONSTRUCT(&xfer, pmix_buffer_t);
- PMIX_LOAD_BUFFER(&xfer, val->data.bo.bytes,
val->data.bo.size);
- pmix_buffer_t *pxfer = &xfer;
- pmix_bfrop.pack(&pbkt, &pxfer, 1, PMIX_BUFFER);
- xfer.base_ptr = NULL;
- xfer.bytes_used = 0;
- PMIX_DESTRUCT(&xfer);
+ PMIX_LOAD_BUFFER(&pbkt, val->data.bo.bytes,
val->data.bo.size);
PMIX_VALUE_RELEASE(val);
}
PMIX_UNLOAD_BUFFER(&pbkt, data, sz);
@@ -457,7 +211,7 @@ pmix_status_t
pmix_server_commit(pmix_peer_t *peer, pmix_buffer_t *buf)
PMIX_RELEASE(dcd);
}
}
- /* see if anyone local is waiting on this data- could
be more than one */
+ /* see if anyone local is waiting on this data - could
be more than one */
return pmix_pending_resolve(nptr, info->rank,
PMIX_SUCCESS, NULL);
}
@@ -826,163 +580,6 @@ pmix_status_t
pmix_server_fence(pmix_server_caddy_t *cd,
return rc;
}
-static void _process_dmdx_reply(int fd, short args, void
*cbdata)
-{
- pmix_dmdx_reply_caddy_t *caddy =
(pmix_dmdx_reply_caddy_t *)cbdata;
- pmix_kval_t *kp;
- pmix_nspace_t *ns, *nptr;
- pmix_status_t rc;
-
- pmix_output_verbose(2, pmix_globals.debug_output,
- "[%s:%d] queue dmdx reply from %s:%d",
- __FILE__, __LINE__,
- caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
-
- /* find the nspace object for this client */
- nptr = NULL;
- PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces,
pmix_nspace_t) {
- if (0 == strcmp(caddy->lcd->proc.nspace,
ns->nspace)) {
- nptr = ns;
- break;
- }
- }
-
- if (NULL == nptr) {
- /* should be impossible */
- PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
- caddy->status = PMIX_ERR_NOT_FOUND;
- goto cleanup;
- }
-
- if (PMIX_SUCCESS == caddy->status) {
- kp = PMIX_NEW(pmix_kval_t);
- kp->key = strdup("modex");
- PMIX_VALUE_CREATE(kp->value, 1);
- kp->value->type = PMIX_BYTE_OBJECT;
- /* we don't know if the host is going to save this
data
- * or not, so we have to copy it */
- kp->value->data.bo.bytes = (char*)malloc(caddy->ndata);
- memcpy(kp->value->data.bo.bytes, caddy->data, caddy->ndata);
- kp->value->data.bo.size = caddy->ndata;
- /* store it in the appropriate hash */
- if (PMIX_SUCCESS != (rc =
pmix_hash_store(&nptr->server->remote,
caddy->lcd->proc.rank, kp))) {
- PMIX_ERROR_LOG(rc);
- }
- PMIX_RELEASE(kp); // maintain acctg
- }
-
-cleanup:
- /* always execute the callback to avoid having the
client hang */
- pmix_pending_resolve(nptr, caddy->lcd->proc.rank,
caddy->status, caddy->lcd);
-
- /* now call the release function so the host server
- * knows it can release the data */
- if (NULL != caddy->relcbfunc) {
- caddy->relcbfunc(caddy->cbdata);
- }
- PMIX_RELEASE(caddy);
-}
-
-static void dmdx_cbfunc(pmix_status_t status,
- const char *data, size_t ndata,
void *cbdata,
- pmix_release_cbfunc_t release_fn, void *release_cbdata)
-{
- pmix_dmdx_reply_caddy_t *caddy;
- caddy = PMIX_NEW(pmix_dmdx_reply_caddy_t);
- caddy->status = status;
- /* point to the callers cbfunc */
- caddy->relcbfunc = release_fn;
- caddy->cbdata = release_cbdata;
-
- caddy->data = data;
- caddy->ndata = ndata;
- caddy->lcd = (pmix_dmdx_local_t *)cbdata;
- pmix_output_verbose(2, pmix_globals.debug_output,
"[%s:%d] queue dmdx reply %s:%d",
- __FILE__, __LINE__,
- caddy->lcd->proc.nspace, caddy->lcd->proc.rank);
- event_assign(&caddy->ev, pmix_globals.evbase, -1, EV_WRITE,
- _process_dmdx_reply, caddy);
- event_priority_set(&caddy->ev, 0);
- event_active(&caddy->ev, EV_WRITE, 1);
-}
-
-pmix_status_t pmix_server_get(pmix_buffer_t *buf,
- pmix_modex_cbfunc_t cbfunc,
- void *cbdata)
-{
- int32_t cnt;
- pmix_status_t rc;
- int rank;
- char *cptr;
- char nspace[PMIX_MAX_NSLEN+1];
- pmix_nspace_t *ns, *nptr;
- pmix_info_t *info=NULL;
- size_t ninfo=0;
-
- pmix_output_verbose(2, pmix_globals.debug_output,
- "recvd GET");
-
- /* setup */
- memset(nspace, 0, sizeof(nspace));
-
- /* retrieve the nspace and rank of the requested proc */
- cnt = 1;
- if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&cptr, &cnt, PMIX_STRING))) {
- PMIX_ERROR_LOG(rc);
- return rc;
- }
- (void)strncpy(nspace, cptr, PMIX_MAX_NSLEN);
- free(cptr);
- cnt = 1;
- if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&rank, &cnt, PMIX_INT))) {
- PMIX_ERROR_LOG(rc);
- return rc;
- }
- /* find the nspace object for this client */
- nptr = NULL;
- PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces,
pmix_nspace_t) {
- if (0 == strcmp(nspace, ns->nspace)) {
- nptr = ns;
- break;
- }
- }
-
- pmix_output_verbose(2, pmix_globals.debug_output,
- "%s:%d EXECUTE GET FOR %s:%d",
- pmix_globals.myid.nspace,
- pmix_globals.myid.rank, nspace, rank);
-
- /* retrieve any provided info structs */
- cnt = 1;
- if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
&ninfo, &cnt, PMIX_SIZE))) {
- PMIX_ERROR_LOG(rc);
- return rc;
- }
- if (0 < ninfo) {
- PMIX_INFO_CREATE(info, ninfo);
- cnt = ninfo;
- if (PMIX_SUCCESS != (rc = pmix_bfrop.unpack(buf,
info, &cnt, PMIX_INFO))) {
- PMIX_ERROR_LOG(rc);
- PMIX_INFO_FREE(info, ninfo);
- return rc;
- }
- }
-
- if (NULL == nptr) {
- /* this is for an nspace we don't know about yet, so
- * give the host server a chance to tell us about
it */
- nptr = PMIX_NEW(pmix_nspace_t);
- (void)strncpy(nptr->nspace, nspace, PMIX_MAX_NSLEN);
- pmix_list_append(&pmix_globals.nspaces, &nptr->super);
- }
- /* if we don't have any ranks for this job, protect
ourselves here */
- if (NULL == nptr->server) {
- nptr->server = PMIX_NEW(pmix_server_nspace_t);
- }
-
- return pmix_pending_request(nptr, rank, info, ninfo,
cbfunc, cbdata);
-}
-
pmix_status_t pmix_server_publish(pmix_peer_t *peer,
pmix_buffer_t *buf,
pmix_op_cbfunc_t cbfunc, void *cbdata)
diff --git
a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h
b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h
index c6279d5..9129b6b 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h
+++ b/opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_ops.h
@@ -183,10 +183,7 @@ void pmix_stop_listening(void);
bool pmix_server_trk_update(pmix_server_trkr_t *trk);
-pmix_status_t pmix_pending_request(pmix_nspace_t *nptr,
int rank,
- pmix_info_t *info, size_t ninfo,
- pmix_modex_cbfunc_t cbfunc, void *cbdata);
-void pmix_pending_nspace_fix(pmix_nspace_t *nptr);
+void pmix_pending_nspace_requests(pmix_nspace_t *nptr);
pmix_status_t pmix_pending_resolve(pmix_nspace_t *nptr,
int rank,
pmix_status_t status, pmix_dmdx_local_t *lcd);
diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c
b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c
index 8cc4bcd..90c42ed 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.c
+++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.c
@@ -123,6 +123,8 @@ const char*
PMIx_Error_string(pmix_status_t errnum)
case PMIX_EXISTS:
return "EXISTS";
+ case PMIX_ERR_SILENT:
+ return "SILENT";
case PMIX_ERROR:
return "ERROR";
case PMIX_SUCCESS:
diff --git a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h
b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h
index f72227a..e43ac47 100644
--- a/opal/mca/pmix/pmix1xx/pmix/src/util/error.h
+++ b/opal/mca/pmix/pmix1xx/pmix/src/util/error.h
@@ -28,9 +28,13 @@
BEGIN_C_DECLS
-#define PMIX_ERROR_LOG(r) \
- pmix_output(0, "PMIX ERROR: %s in file %s at line %d", \
- PMIx_Error_string((r)), __FILE__, __LINE__);
+#define PMIX_ERROR_LOG(r) \
+ do { \
+ if (PMIX_ERR_SILENT != (r)) { \
+ pmix_output(0, "PMIX ERROR: %s in file %s at
line %d", \
+ PMIx_Error_string((r)), __FILE__, __LINE__); \
+ } \
+ }while(0);
#define PMIX_REPORT_ERROR(e) \
do { \
diff --git a/opal/mca/pmix/pmix1xx/pmix1_client.c
b/opal/mca/pmix/pmix1xx/pmix1_client.c
index f1ba0d5..e9c50b7 100644
--- a/opal/mca/pmix/pmix1xx/pmix1_client.c
+++ b/opal/mca/pmix/pmix1xx/pmix1_client.c
@@ -217,6 +217,7 @@ int pmix1_store_local(const
opal_process_name_t *proc, opal_value_t *val)
}
}
if (NULL == job) {
+ OPAL_ERROR_LOG(OPAL_ERR_NOT_FOUND);
return OPAL_ERR_NOT_FOUND;
}
(void)strncpy(p.nspace, job->nspace, PMIX_MAX_NSLEN);
diff --git a/opal/mca/pmix/pmix1xx/pmix1_server_south.c
b/opal/mca/pmix/pmix1xx/pmix1_server_south.c
index ae42de0..f0d0f11 100644
--- a/opal/mca/pmix/pmix1xx/pmix1_server_south.c
+++ b/opal/mca/pmix/pmix1xx/pmix1_server_south.c
@@ -156,10 +156,10 @@ static void opcbfunc(pmix_status_t
status, void *cbdata)
}
int pmix1_server_register_nspace(opal_jobid_t jobid,
- int nlocalprocs,
- opal_list_t *info,
- opal_pmix_op_cbfunc_t cbfunc,
- void *cbdata)
+ int nlocalprocs,
+ opal_list_t *info,
+ opal_pmix_op_cbfunc_t cbfunc,
+ void *cbdata)
{
opal_value_t *kv, *k2;
pmix_info_t *pinfo, *pmap;
@@ -168,10 +168,17 @@ int
pmix1_server_register_nspace(opal_jobid_t jobid,
pmix_status_t rc;
pmix1_opcaddy_t *op;
opal_list_t *pmapinfo;
+ opal_pmix1_jobid_trkr_t *job;
/* convert the jobid */
(void)snprintf(nspace, PMIX_MAX_NSLEN,
opal_convert_jobid_to_string(jobid));
+ /* store this job in our list of known nspaces */
+ job = OBJ_NEW(opal_pmix1_jobid_trkr_t);
+ (void)strncpy(job->nspace, nspace, PMIX_MAX_NSLEN);
+ job->jobid = jobid;
+ opal_list_append(&mca_pmix_pmix1xx_component.jobids,
&job->super);
+
/* convert the list to an array of pmix_info_t */
if (NULL != info) {
sz = opal_list_get_size(info);
@@ -220,10 +227,10 @@ int
pmix1_server_register_nspace(opal_jobid_t jobid,
int pmix1_server_register_client(const opal_process_name_t
*proc,
- uid_t uid, gid_t gid,
- void *server_object,
- opal_pmix_op_cbfunc_t cbfunc,
- void *cbdata)
+ uid_t uid, gid_t gid,
+ void *server_object,
+ opal_pmix_op_cbfunc_t cbfunc,
+ void *cbdata)
{
pmix_status_t rc;
pmix1_opcaddy_t *op;
@@ -275,7 +282,7 @@ static void dmdx_response(pmix_status_t
status, char *data, size_t sz, void *cbd
}
int pmix1_server_dmodex(const opal_process_name_t *proc,
- opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
+ opal_pmix_modex_cbfunc_t cbfunc, void *cbdata)
{
pmix1_opcaddy_t *op;
pmix_status_t rc;
diff --git a/orte/orted/pmix/pmix_server.c
b/orte/orted/pmix/pmix_server.c
index 953145d..ee5582c 100644
--- a/orte/orted/pmix/pmix_server.c
+++ b/orte/orted/pmix/pmix_server.c
@@ -505,7 +505,6 @@ static void pmix_server_dmdx_resp(int
status, orte_process_name_t* sender,
int rc, ret, room_num;
int32_t cnt;
opal_process_name_t target;
- opal_value_t kv;
pmix_server_req_t *req;
uint8_t *data = NULL;
int32_t ndata = 0;
@@ -542,29 +541,14 @@ static void pmix_server_dmdx_resp(int
status, orte_process_name_t* sender,
return;
}
- /* if we got something, store the blobs locally so we can
- * meet any further requests without doing a remote fetch.
- * This must be done as a single blob for later
retrieval */
- if (ORTE_SUCCESS == ret && NULL != data) {
- OBJ_CONSTRUCT(&kv, opal_value_t);
- kv.key = strdup("modex");
- kv.type = OPAL_BYTE_OBJECT;
- kv.data.bo.bytes = data;
- kv.data.bo.size = ndata;
- if (OPAL_SUCCESS != (rc =
opal_pmix.store_local(&target, &kv))) {
- ORTE_ERROR_LOG(rc);
- }
- kv.data.bo.bytes = NULL; // protect the data
- kv.data.bo.size = 0;
- OBJ_DESTRUCT(&kv);
- }
-
/* check the request out of the tracking hotel */
opal_hotel_checkout_and_return_occupant(&orte_pmix_server_globals.reqs,
room_num, (void**)&req);
/* return the returned data to the requestor */
if (NULL != req) {
if (NULL != req->mdxcbfunc) {
req->mdxcbfunc(ret, (char*)data, ndata, req->cbdata,
relcbfunc, data);
+ } else {
+ free(data);
}
OBJ_RELEASE(req);
}
diff --git a/orte/orted/pmix/pmix_server_fence.c
b/orte/orted/pmix/pmix_server_fence.c
index b3b0e33..765c1c2 100644
--- a/orte/orted/pmix/pmix_server_fence.c
+++ b/orte/orted/pmix/pmix_server_fence.c
@@ -197,6 +197,12 @@ static void dmodex_req(int sd, short
args, void *cbdata)
goto callback;
}
+ /* if we are the host daemon, then this is a local
request, so
+ * just wait for the data to come in */
+ if (ORTE_PROC_MY_NAME->vpid == dmn->name.vpid) {
+ return;
+ }
+
/* construct a request message */
buf = OBJ_NEW(opal_buffer_t);
if (OPAL_SUCCESS != (rc = opal_dss.pack(buf,
&req->target, 1, OPAL_NAME))) {
-----------------------------------------------------------------------
Summary of changes:
ompi/runtime/ompi_mpi_init.c | 7 +-
opal/mca/pmix/base/pmix_base_frame.c | 11 +-
opal/mca/pmix/pmix.h | 13 +-
.../pmix1xx/pmix/include/pmix/pmix_common.h.in
<http://pmix_common.h.in> | 3 +-
.../pmix/pmix1xx/pmix/src/client/pmix_client_get.c | 1 +
opal/mca/pmix/pmix1xx/pmix/src/server/Makefile.am | 3 +-
.../mca/pmix/pmix1xx/pmix/src/server/pmix_server.c | 11 +-
.../pmix/pmix1xx/pmix/src/server/pmix_server_get.c | 552
+++++++++++++++++++++
.../pmix/pmix1xx/pmix/src/server/pmix_server_ops.c | 407
+--------------
.../pmix/pmix1xx/pmix/src/server/pmix_server_ops.h | 5 +-
opal/mca/pmix/pmix1xx/pmix/src/util/error.c | 2 +
opal/mca/pmix/pmix1xx/pmix/src/util/error.h | 10 +-
opal/mca/pmix/pmix1xx/pmix1_client.c | 1 +
opal/mca/pmix/pmix1xx/pmix1_server_south.c | 25 +-
orte/orted/pmix/pmix_server.c | 20 +-
orte/orted/pmix/pmix_server_fence.c | 6 +
16 files changed, 620 insertions(+), 457 deletions(-)
create mode 100644
opal/mca/pmix/pmix1xx/pmix/src/server/pmix_server_get.c
hooks/post-receive
--
open-mpi/ompi
_______________________________________________
ompi-commits mailing list
ompi-comm...@open-mpi.org <mailto:ompi-comm...@open-mpi.org>
http://www.open-mpi.org/mailman/listinfo.cgi/ompi-commits
_______________________________________________
devel mailing list
de...@open-mpi.org <mailto:de...@open-mpi.org>
Subscription:http://www.open-mpi.org/mailman/listinfo.cgi/devel
Link to this
post:http://www.open-mpi.org/community/lists/devel/2015/10/18297.php