There was a typo in this patch. I saw it in the trunk but I missed the pending PR. Please move 7c574a35309 as well
George. On Sun, Feb 7, 2016 at 8:01 AM, <git...@crest.iu.edu> wrote: > This is an automated email from the git hooks/post-receive script. It was > generated because a ref change was pushed to the repository containing > the project "open-mpi/ompi-release". > > The branch, v2.x has been updated > via efeac60a18a06d4224394dbce41c4486b28ca194 (commit) > via 0e8f2675bf13dd6aa34cbf4492f92a5cddcaaf6f (commit) > via 245147390edeb9cd9fab1d08610f83841588989b (commit) > via 98532dda3d642f845498a40e7e1c660f13ab67aa (commit) > via b1cb049a9d9a486c6e08bd2966b0033e30df7055 (commit) > from 1280d534de59030c4311fb55df455d0739362e9f (commit) > > Those revisions listed above that are new to this repository have > not appeared on any other notification email; so we list those > revisions in full, below. > > - Log ----------------------------------------------------------------- > > https://github.com/open-mpi/ompi-release/commit/efeac60a18a06d4224394dbce41c4486b28ca194 > > commit efeac60a18a06d4224394dbce41c4486b28ca194 > Merge: 1280d53 0e8f267 > Author: Jeff Squyres <jsquy...@users.noreply.github.com> > Date: Sun Feb 7 08:01:26 2016 -0500 > > Merge pull request #947 from hjelmn/v2.x_osc_pt2pt_fixes > > v2.x osc/pt2pt fixes > > > > > https://github.com/open-mpi/ompi-release/commit/0e8f2675bf13dd6aa34cbf4492f92a5cddcaaf6f > > commit 0e8f2675bf13dd6aa34cbf4492f92a5cddcaaf6f > Author: Nathan Hjelm <hje...@lanl.gov> > Date: Thu Feb 4 16:59:39 2016 -0700 > > osc/pt2pt: bug fixes > > This commit fixes several bugs identified by @ggouaillardet and MTT: > > - Fix SEGV in long send completion caused by missing update to the > request callback data. > > - Add an MPI_Barrier to the fence short-cut. This fixes potential > semantic issues where messages may be received before fence is > reached. > > - Ensure fragments are flushed when using request-based RMA. This > allows MPI_Test/MPI_Wait/etc to work as expected. > > - Restore the tag space back to 16-bits. It was intended that the > space be expanded to 32-bits but the required change to the > fragment headers was not committed. The tag space may be expanded > in a later commit. > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > (cherry picked from commit > open-mpi/ompi@5b9c82a9648b06364b695e199711e1c26a3afeeb) > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h > b/ompi/mca/osc/pt2pt/osc_pt2pt.h > index 409011d..68ca022 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h > @@ -631,8 +631,8 @@ static inline void osc_pt2pt_add_pending > (ompi_osc_pt2pt_pending_t *pending) > opal_list_append > (&mca_osc_pt2pt_component.pending_operations, &pending->super)); > } > > -#define OSC_PT2PT_FRAG_TAG 0x80000 > -#define OSC_PT2PT_FRAG_MASK 0x7ffff > +#define OSC_PT2PT_FRAG_TAG 0x10000 > +#define OSC_PT2PT_FRAG_MASK 0x0ffff > > /** > * get_tag: > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > index 58d6b40..0b3c2e0 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > @@ -147,6 +147,7 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) > > /* short-circuit the noprecede case */ > if (0 != (assert & MPI_MODE_NOPRECEDE)) { > + module->comm->c_coll.coll_barrier (module->comm, > module->comm->c_coll.coll_barrier); > OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, > "osc pt2pt: fence end (short circuit)")); > return ret; > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > index a1dcfd7..1205767 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > @@ -58,6 +58,9 @@ static int ompi_osc_pt2pt_req_comm_complete > (ompi_request_t *request) > "ompi_osc_pt2pt_req_comm_complete called tag = > %d", > request->req_status.MPI_TAG)); > > + /* update the cbdata for ompi_osc_pt2pt_comm_complete */ > + request->req_complete_cb_data = pt2pt_request->module; > + > if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) > { > ompi_osc_pt2pt_request_complete (pt2pt_request, > request->req_status.MPI_ERROR); > } > @@ -218,8 +221,8 @@ static inline int ompi_osc_pt2pt_gacc_self > (ompi_osc_pt2pt_sync_t *pt2pt_sync, c > ((unsigned long) target_disp * module->disp_unit); > int ret; > > - /* if we are in active target mode wait until all post messages > arrive */ > - ompi_osc_pt2pt_sync_wait (pt2pt_sync); > + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, > ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: > starting local " > + "get accumulate")); > > ompi_osc_pt2pt_accumulate_lock (module); > > @@ -250,6 +253,9 @@ static inline int ompi_osc_pt2pt_gacc_self > (ompi_osc_pt2pt_sync_t *pt2pt_sync, c > > ompi_osc_pt2pt_accumulate_unlock (module); > > + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, > ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local > get " > + "accumulate complete")); > + > if (request) { > /* NTH: is it ok to use an ompi error code here? */ > ompi_osc_pt2pt_request_complete (request, ret); > @@ -310,14 +316,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const > void *origin_addr, int origin_ > payload_len = origin_dt->super.size * origin_count; > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + > payload_len; > > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false, true); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true, false); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true, false); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -469,14 +475,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > payload_len = origin_dt->super.size * origin_count; > > frag_len = sizeof(*header) + ddt_len + payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false, true); > if (OMPI_SUCCESS != ret) { > frag_len = sizeof(*header) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true, !request); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(*header) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true, !request); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -488,7 +494,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > tag = get_rtag (module); > } > > - if (is_long_msg || is_long_datatype) { > + if (is_long_msg) { > /* wait for synchronization before posting a long message */ > ompi_osc_pt2pt_sync_wait (pt2pt_sync); > } > @@ -631,7 +637,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void > *origin_addr, const void *compar > } > > frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + > payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false, false); > if (OMPI_SUCCESS != ret) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -663,9 +669,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void > *origin_addr, const void *compar > return ret; > } > > - ret = ompi_osc_pt2pt_frag_finish(module, frag); > - > - return ret; > + return ompi_osc_pt2pt_frag_finish (module, frag); > } > > > @@ -779,11 +783,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void > *origin_addr, int origin_co > ddt_len = ompi_datatype_pack_description_length(target_dt); > > frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false, release_req); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len */ > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false, release_req); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -961,6 +965,11 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > return OMPI_SUCCESS; > } > > + if (!release_req) { > + /* wait for epoch to begin before starting operation */ > + ompi_osc_pt2pt_sync_wait (pt2pt_sync); > + } > + > /* optimize the self case. TODO: optimize the local case */ > if (ompi_comm_rank (module->comm) == target_rank) { > *request = &pt2pt_request->super; > @@ -987,14 +996,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > } > > frag_len = sizeof(*header) + ddt_len + payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, > &ptr, false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, > &ptr, false, release_req); > if (OMPI_SUCCESS != ret) { > frag_len = sizeof(*header) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, > &frag, &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, > &frag, &ptr, true, release_req); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(*header) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, > frag_len, &frag, &ptr, true); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, > frag_len, &frag, &ptr, true, release_req); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -1014,11 +1023,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > /* increment the number of outgoing fragments */ > ompi_osc_signal_outgoing (module, target_rank, > pt2pt_request->outstanding_requests); > > - if (!release_req) { > - /* wait for epoch to begin before starting operation */ > - ompi_osc_pt2pt_sync_wait (pt2pt_sync); > - } > - > header = (ompi_osc_pt2pt_header_acc_t *) ptr; > header->base.flags = 0; > header->len = frag_len; > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > index 09bd285..681e73a 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send > (ompi_osc_pt2pt_module_t *module, int target, > char *ptr; > int ret; > > - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, > false); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, > false, true); > if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { > memcpy (ptr, data, len); > > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > index f55e6cb..da51b7d 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > @@ -57,16 +57,62 @@ static inline int ompi_osc_pt2pt_frag_finish > (ompi_osc_pt2pt_module_t *module, > return OMPI_SUCCESS; > } > > +static inline ompi_osc_pt2pt_frag_t > *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module, > + > ompi_osc_pt2pt_peer_t *peer, > + > size_t request_len) > +{ > + ompi_osc_pt2pt_frag_t *curr; > + > + /* to ensure ordering flush the buffer on the peer */ > + curr = peer->active_frag; > + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, > NULL)) { > + /* If there's something pending, the pending finish will > + start the buffer. Otherwise, we need to start it now. */ > + int ret = ompi_osc_pt2pt_frag_finish (module, curr); > + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > + return NULL; > + } > + } > + > + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get > (&mca_osc_pt2pt_component.frags); > + if (OPAL_UNLIKELY(NULL == curr)) { > + return NULL; > + } > + > + curr->target = peer->rank; > + > + curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; > + curr->top = (char*) (curr->header + 1); > + curr->remain_len = mca_osc_pt2pt_component.buffer_size; > + curr->module = module; > + curr->pending = 1; > + > + curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; > + curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; > + if (module->passive_target_access_epoch) { > + curr->header->base.flags |= > OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; > + } > + curr->header->source = ompi_comm_rank(module->comm); > + curr->header->num_ops = 1; > + > + return curr; > +} > + > /* > - * Note: module lock must be held during this operation > + * Note: this function takes the module lock > + * > + * buffered sends will cache the fragment on the peer object associated > with the > + * target. unbuffered-sends will cause the target fragment to be flushed > and > + * will not be cached on the peer. this causes the fragment to be flushed > as > + * soon as it is sent. this allows request-based rma fragments to be > completed > + * so MPI_Test/MPI_Wait/etc will work as expected. > */ > static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t > *module, int target, > size_t request_len, > ompi_osc_pt2pt_frag_t **buffer, > - char **ptr, bool long_send) > + char **ptr, bool long_send, > bool buffered) > { > ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, > target); > ompi_osc_pt2pt_frag_t *curr; > - int ret; > > /* osc pt2pt headers can have 64-bit values. these will need to be > aligned > * on an 8-byte boundary on some architectures so we up align the > allocation > @@ -77,51 +123,34 @@ static inline int ompi_osc_pt2pt_frag_alloc > (ompi_osc_pt2pt_module_t *module, in > return OMPI_ERR_OUT_OF_RESOURCE; > } > > + OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, > ompi_osc_base_framework.framework_output, > + "attempting to allocate buffer for %lu bytes to > target %d. long send: %d, " > + "buffered: %d", (unsigned long) request_len, > target, long_send, buffered)); > + > OPAL_THREAD_LOCK(&module->lock); > - curr = peer->active_frag; > - if (NULL == curr || curr->remain_len < request_len || (long_send && > curr->pending_long_sends == 32)) { > - if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, > NULL)) { > - /* If there's something pending, the pending finish will > - start the buffer. Otherwise, we need to start it now. */ > - ret = ompi_osc_pt2pt_frag_finish (module, curr); > - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > + if (buffered) { > + curr = peer->active_frag; > + if (NULL == curr || curr->remain_len < request_len || (long_send > && curr->pending_long_sends == 32)) { > + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, > request_len); > + if (OPAL_UNLIKELY(NULL == curr)) { > OPAL_THREAD_UNLOCK(&module->lock); > - return ret; > + return OMPI_ERR_OUT_OF_RESOURCE; > } > - } > - > - curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get > (&mca_osc_pt2pt_component.frags); > - if (OPAL_UNLIKELY(NULL == curr)) { > - return OMPI_ERR_OUT_OF_RESOURCE; > - } > - > - curr->target = target; > > - curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer; > - curr->top = (char*) (curr->header + 1); > - curr->remain_len = mca_osc_pt2pt_component.buffer_size; > - curr->module = module; > - curr->pending = 2; > - curr->pending_long_sends = long_send; > - > - curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; > - curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; > - if (module->passive_target_access_epoch) { > - curr->header->base.flags |= > OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; > + curr->pending_long_sends = long_send; > + peer->active_frag = curr; > + } else { > + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); > + curr->pending_long_sends += long_send; > } > - curr->header->source = ompi_comm_rank(module->comm); > - curr->header->num_ops = 1; > > - if (curr->remain_len < request_len) { > + OPAL_THREAD_ADD32(&curr->pending, 1); > + } else { > + curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, > request_len); > + if (OPAL_UNLIKELY(NULL == curr)) { > OPAL_THREAD_UNLOCK(&module->lock); > - return OMPI_ERR_TEMP_OUT_OF_RESOURCE; > + return OMPI_ERR_OUT_OF_RESOURCE; > } > - > - peer->active_frag = curr; > - } else { > - OPAL_THREAD_ADD32(&curr->pending, 1); > - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); > - curr->pending_long_sends += long_send; > } > > *ptr = curr->top; > > > > https://github.com/open-mpi/ompi-release/commit/245147390edeb9cd9fab1d08610f83841588989b > > commit 245147390edeb9cd9fab1d08610f83841588989b > Author: Nathan Hjelm <hje...@lanl.gov> > Date: Tue Feb 2 12:44:17 2016 -0700 > > osc/pt2pt: eager sends are always active if MPI_MODE_NOCHECK is used > > This commit fixes open-mpi/ompi#1299. > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > (cherry picked from > open-mpi/ompi@519fffb65e7a9502b0e5edeb72b1ad2d802daed4) > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c > index 0ddc4cf..099aa56 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c > @@ -8,7 +8,7 @@ > * University of Stuttgart. All rights reserved. > * Copyright (c) 2004-2005 The Regents of the University of California. > * All rights reserved. > - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2010 IBM Corporation. All rights reserved. > * Copyright (c) 2012-2013 Sandia National Laboratories. All rights > reserved. > @@ -244,6 +244,8 @@ static int ompi_osc_pt2pt_lock_internal_execute > (ompi_osc_pt2pt_module_t *module > } > > } > + } else { > + lock->eager_send_active = true; > } > > return OMPI_SUCCESS; > > > > https://github.com/open-mpi/ompi-release/commit/98532dda3d642f845498a40e7e1c660f13ab67aa > > commit 98532dda3d642f845498a40e7e1c660f13ab67aa > Author: Nathan Hjelm <hje...@lanl.gov> > Date: Tue Feb 2 12:22:21 2016 -0700 > > osc/pt2pt: various threading fixes > > This commit fixes several bugs identified by a new multi-threaded RMA > benchmarking suite. The following bugs have been identified and fixed: > > - The code that signaled the actual start of an access epoch changed > the eager_send_active flag on a synchronization object without > holding the object's lock. This could cause another thread waiting > on eager sends to block indefinitely because the entirety of > ompi_osc_pt2pt_sync_expected could exectute between the check of > eager_send_active and the conditon wait of > ompi_osc_pt2pt_sync_wait. > > - The bookkeeping of fragments could get screwed up when performing > long put/accumulate operations from different threads. This was > caused by the fragment flush code at the end of both put and > accumulate. This code was put in place to avoid sending a large > number of unexpected messages to a peer. To fix the bookkeeping > issue we now 1) wait for eager sends to be active before stating > any large isend's, and 2) keep track of the number of large isends > associated with a fragment. If the number of large isends reaches > 32 the active fragment is flushed. > > - Use atomics to update the large receive/send tag counters. This > prevents duplicate tags from being used. The tag space has also > been updated to use the entire 16-bits of the tag space. > > These changes should also fix open-mpi/ompi#1299. > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > (cherry picked from > open-mpi/ompi@d7264aa61394ffa278cc9ea08bc7b4704fb680e1) > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h > b/ompi/mca/osc/pt2pt/osc_pt2pt.h > index 1f3c204..409011d 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h > @@ -8,7 +8,7 @@ > * University of Stuttgart. All rights reserved. > * Copyright (c) 2004-2005 The Regents of the University of California. > * All rights reserved. > - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. > * Copyright (c) 2012-2013 Sandia National Laboratories. All rights > reserved. > @@ -149,20 +149,20 @@ struct ompi_osc_pt2pt_module_t { > uint32_t *epoch_outgoing_frag_count; > > /** cyclic counter for a unique tage for long messages. */ > - unsigned int tag_counter; > - unsigned int rtag_counter; > + uint32_t tag_counter; > + uint32_t rtag_counter; > > /* Number of outgoing fragments that have completed since the > begining of time */ > - uint32_t outgoing_frag_count; > + volatile uint32_t outgoing_frag_count; > /* Next outgoing fragment count at which we want a signal on cond */ > - uint32_t outgoing_frag_signal_count; > + volatile uint32_t outgoing_frag_signal_count; > > /* Number of incoming fragments that have completed since the > begining of time */ > - uint32_t active_incoming_frag_count; > + volatile uint32_t active_incoming_frag_count; > /* Next incoming buffer count at which we want a signal on cond */ > - uint32_t active_incoming_frag_signal_count; > + volatile uint32_t active_incoming_frag_signal_count; > > /** Number of targets locked/being locked */ > unsigned int passive_target_access_epoch; > @@ -409,14 +409,6 @@ int > ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module, > int tag, > struct ompi_communicator_t *comm); > > -int ompi_osc_pt2pt_component_isend(ompi_osc_pt2pt_module_t *module, > - const void *buf, > - size_t count, > - struct ompi_datatype_t *datatype, > - int dest, > - int tag, > - struct ompi_communicator_t *comm); > - > /** > * ompi_osc_pt2pt_progress_pending_acc: > * > @@ -639,8 +631,8 @@ static inline void osc_pt2pt_add_pending > (ompi_osc_pt2pt_pending_t *pending) > opal_list_append > (&mca_osc_pt2pt_component.pending_operations, &pending->super)); > } > > -#define OSC_PT2PT_FRAG_TAG 0x10000 > -#define OSC_PT2PT_FRAG_MASK 0x0ffff > +#define OSC_PT2PT_FRAG_TAG 0x80000 > +#define OSC_PT2PT_FRAG_MASK 0x7ffff > > /** > * get_tag: > @@ -658,11 +650,8 @@ static inline int get_tag(ompi_osc_pt2pt_module_t > *module) > /* the LSB of the tag is used be the receiver to determine if the > message is a passive or active target (ie, where to mark > completion). */ > - int tmp = module->tag_counter + > !!(module->passive_target_access_epoch); > - > - module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK; > - > - return tmp; > + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) > &module->tag_counter, 4); > + return (tmp & OSC_PT2PT_FRAG_MASK) | > !!(module->passive_target_access_epoch); > } > > static inline int get_rtag(ompi_osc_pt2pt_module_t *module) > @@ -670,11 +659,8 @@ static inline int get_rtag(ompi_osc_pt2pt_module_t > *module) > /* the LSB of the tag is used be the receiver to determine if the > message is a passive or active target (ie, where to mark > completion). */ > - int tmp = module->rtag_counter + > !!(module->passive_target_access_epoch); > - > - module->rtag_counter = (module->rtag_counter + 4) & > OSC_PT2PT_FRAG_MASK; > - > - return tmp; > + int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) > &module->rtag_counter, 4); > + return (tmp & OSC_PT2PT_FRAG_MASK) | > !!(module->passive_target_access_epoch); > } > /** > * ompi_osc_pt2pt_accumulate_lock: > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > index e169add..58d6b40 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c > @@ -8,7 +8,7 @@ > * University of Stuttgart. All rights reserved. > * Copyright (c) 2004-2005 The Regents of the University of California. > * All rights reserved. > - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2010 IBM Corporation. All rights reserved. > * Copyright (c) 2012-2013 Sandia National Laboratories. All rights > reserved. > @@ -211,7 +211,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int > assert, ompi_win_t *win) > ompi_osc_pt2pt_module_t *module = GET_MODULE(win); > ompi_osc_pt2pt_sync_t *sync = &module->all_sync; > > - OPAL_THREAD_LOCK(&sync->lock); > + OPAL_THREAD_LOCK(&module->lock); > > /* check if we are already in an access epoch */ > if (ompi_osc_pt2pt_access_epoch_active (module)) { > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > index b22f783..a1dcfd7 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > @@ -8,7 +8,7 @@ > * University of Stuttgart. All rights reserved. > * Copyright (c) 2004-2005 The Regents of the University of California. > * All rights reserved. > - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. > * Copyright (c) 2012-2013 Sandia National Laboratories. All rights > reserved. > @@ -34,27 +34,55 @@ > #include <stdio.h> > > /* progress an OSC request */ > +static int ompi_osc_pt2pt_comm_complete (ompi_request_t *request) > +{ > + ompi_osc_pt2pt_module_t *module = > + (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; > + > + OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, > + "isend_completion_cb called")); > + > + mark_outgoing_completion(module); > + > + /* put this request on the garbage colletion list */ > + osc_pt2pt_gc_add_request (module, request); > + > + return OMPI_SUCCESS; > +} > + > static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request) > { > ompi_osc_pt2pt_request_t *pt2pt_request = (ompi_osc_pt2pt_request_t > *) request->req_complete_cb_data; > - ompi_osc_pt2pt_module_t *module = pt2pt_request->module; > > OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, > "ompi_osc_pt2pt_req_comm_complete called tag = > %d", > request->req_status.MPI_TAG)); > > - mark_outgoing_completion (module); > - > if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) > { > ompi_osc_pt2pt_request_complete (pt2pt_request, > request->req_status.MPI_ERROR); > } > > - /* put this request on the garbage colletion list */ > - osc_pt2pt_gc_add_request (module, request); > + return ompi_osc_pt2pt_comm_complete (request); > +} > > - return OMPI_SUCCESS; > +static inline int ompi_osc_pt2pt_data_isend (ompi_osc_pt2pt_module_t > *module, const void *buf, > + size_t count, > ompi_datatype_t *datatype, int dest, > + int tag, > ompi_osc_pt2pt_request_t *request) > +{ > + /* increment the outgoing send count */ > + ompi_osc_signal_outgoing (module, dest, 1); > + > + if (NULL != request) { > + ++request->outstanding_requests; > + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, > tag, module->comm, > + > ompi_osc_pt2pt_req_comm_complete, request); > + } > + > + return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, > module->comm, > + ompi_osc_pt2pt_comm_complete, > module); > } > > + > static int ompi_osc_pt2pt_dt_send_complete (ompi_request_t *request) > { > ompi_datatype_t *datatype = (ompi_datatype_t *) > request->req_complete_cb_data; > @@ -282,14 +310,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const > void *origin_addr, int origin_ > payload_len = origin_dt->super.size * origin_count; > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + > payload_len; > > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -301,9 +329,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void > *origin_addr, int origin_ > tag = get_tag(module); > } > > - /* flush will be called at the end of this function. make sure all > post messages have > - * arrived. */ > - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == > pt2pt_sync->type) { > + if (is_long_msg) { > + /* wait for eager sends to be active before starting a long put */ > ompi_osc_pt2pt_sync_wait (pt2pt_sync); > } > > @@ -361,18 +388,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const > void *origin_addr, int origin_ > header->tag = tag; > osc_pt2pt_hton(header, proc); > > - /* increase the outgoing signal count */ > - ompi_osc_signal_outgoing (module, target, 1); > - > - if (request) { > - request->outstanding_requests = 1; > - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, > origin_count, origin_dt, > - target, tag, > module->comm, ompi_osc_pt2pt_req_comm_complete, > - request); > - } else { > - ret = ompi_osc_pt2pt_component_isend (module,origin_addr, > origin_count, origin_dt, target, tag, > - module->comm); > - } > + ret = ompi_osc_pt2pt_data_isend (module,origin_addr, > origin_count, origin_dt, target, tag, > + request); > } > } while (0); > > @@ -380,14 +397,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const > void *origin_addr, int origin_ > header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; > } > > - ret = ompi_osc_pt2pt_frag_finish(module, frag); > - > - if (request || is_long_msg) { > - /* need to flush now in case the caller decides to wait on the > request */ > - ompi_osc_pt2pt_frag_flush_target (module, target); > - } > - > - return ret; > + return ompi_osc_pt2pt_frag_finish(module, frag); > } > > int > @@ -459,14 +469,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > payload_len = origin_dt->super.size * origin_count; > > frag_len = sizeof(*header) + ddt_len + payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > if (OMPI_SUCCESS != ret) { > frag_len = sizeof(*header) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, true); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(*header) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, > &frag, &ptr, true); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -478,9 +488,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > tag = get_rtag (module); > } > > - /* flush will be called at the end of this function. make sure all > post messages have > - * arrived. */ > - if ((is_long_msg || request) && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == > pt2pt_sync->type) { > + if (is_long_msg || is_long_datatype) { > + /* wait for synchronization before posting a long message */ > ompi_osc_pt2pt_sync_wait (pt2pt_sync); > } > > @@ -538,18 +547,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > OPAL_OUTPUT_VERBOSE((25, > ompi_osc_base_framework.framework_output, > "acc: starting long accumulate with tag > %d", tag)); > > - /* increment the outgoing send count */ > - ompi_osc_signal_outgoing (module, target, 1); > - > - if (request) { > - request->outstanding_requests = 1; > - ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, > origin_count, origin_dt, > - target, tag, > module->comm, ompi_osc_pt2pt_req_comm_complete, > - request); > - } else { > - ret = ompi_osc_pt2pt_component_isend (module, > origin_addr, origin_count, origin_dt, target, tag, > - module->comm); > - } > + ret = ompi_osc_pt2pt_data_isend (module, origin_addr, > origin_count, origin_dt, target, tag, > + request); > } > } while (0); > > @@ -561,14 +560,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_VALID; > } > > - ret = ompi_osc_pt2pt_frag_finish(module, frag); > - > - if (is_long_msg || request) { > - /* need to flush now in case the caller decides to wait on the > request */ > - ompi_osc_pt2pt_frag_flush_target (module, target); > - } > - > - return ret; > + return ompi_osc_pt2pt_frag_finish(module, frag); > } > > int > @@ -639,7 +631,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void > *origin_addr, const void *compar > } > > frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + > payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > if (OMPI_SUCCESS != ret) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -787,11 +779,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void > *origin_addr, int origin_co > ddt_len = ompi_datatype_pack_description_length(target_dt); > > frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len */ > frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, > &ptr, false); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -804,9 +796,8 @@ static inline int ompi_osc_pt2pt_rget_internal (void > *origin_addr, int origin_co > /* for bookkeeping the get is "outgoing" */ > ompi_osc_signal_outgoing (module, target, 1); > > - /* flush will be called at the end of this function. make sure all > post messages have > - * arrived. */ > - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == > pt2pt_sync->type) { > + if (!release_req) { > + /* wait for epoch to begin before starting rget operation */ > ompi_osc_pt2pt_sync_wait (pt2pt_sync); > } > > @@ -857,14 +848,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void > *origin_addr, int origin_co > *request = &pt2pt_request->super; > } > > - ret = ompi_osc_pt2pt_frag_finish(module, frag); > - > - if (!release_req) { > - /* need to flush now in case the caller decides to wait on the > request */ > - ompi_osc_pt2pt_frag_flush_target (module, target); > - } > - > - return ret; > + return ompi_osc_pt2pt_frag_finish(module, frag); > } > > int ompi_osc_pt2pt_rget (void *origin_addr, int origin_count, struct > ompi_datatype_t *origin_dt, > @@ -1003,14 +987,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > } > > frag_len = sizeof(*header) + ddt_len + payload_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, > &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, > &ptr, false); > if (OMPI_SUCCESS != ret) { > frag_len = sizeof(*header) + ddt_len; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, > &frag, &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, > &frag, &ptr, true); > if (OMPI_SUCCESS != ret) { > /* allocate space for the header plus space to store ddt_len > */ > frag_len = sizeof(*header) + 8; > - ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, > frag_len, &frag, &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, > frag_len, &frag, &ptr, true); > if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > @@ -1030,9 +1014,8 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > /* increment the number of outgoing fragments */ > ompi_osc_signal_outgoing (module, target_rank, > pt2pt_request->outstanding_requests); > > - /* flush will be called at the end of this function. make sure all > post messages have > - * arrived. */ > - if (!release_req && OMPI_OSC_PT2PT_SYNC_TYPE_PSCW == > pt2pt_sync->type) { > + if (!release_req) { > + /* wait for epoch to begin before starting operation */ > ompi_osc_pt2pt_sync_wait (pt2pt_sync); > } > > @@ -1100,14 +1083,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const > void *origin_addr, int origin > *request = (ompi_request_t *) pt2pt_request; > } > > - ret = ompi_osc_pt2pt_frag_finish(module, frag); > - > - if (!release_req) { > - /* need to flush now in case the caller decides to wait on the > request */ > - ompi_osc_pt2pt_frag_flush_target (module, target_rank); > - } > - > - return ret; > + return ompi_osc_pt2pt_frag_finish(module, frag); > } > > int ompi_osc_pt2pt_get_accumulate(const void *origin_addr, int > origin_count, > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > index 6883d79..09bd285 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c > @@ -8,7 +8,7 @@ > * University of Stuttgart. All rights reserved. > * Copyright (c) 2004-2005 The Regents of the University of California. > * All rights reserved. > - * Copyright (c) 2007-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2007-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2009-2011 Oracle and/or its affiliates. All rights > reserved. > * Copyright (c) 2012-2013 Sandia National Laboratories. All rights > reserved. > @@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send > (ompi_osc_pt2pt_module_t *module, int target, > char *ptr; > int ret; > > - ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr); > + ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, > false); > if (OPAL_LIKELY(OMPI_SUCCESS == ret)) { > memcpy (ptr, data, len); > > @@ -1682,33 +1682,6 @@ int ompi_osc_pt2pt_component_irecv > (ompi_osc_pt2pt_module_t *module, void *buf, > osc_pt2pt_incoming_req_complete, > module); > } > > - > -static int > -isend_completion_cb(ompi_request_t *request) > -{ > - ompi_osc_pt2pt_module_t *module = > - (ompi_osc_pt2pt_module_t*) request->req_complete_cb_data; > - > - OPAL_OUTPUT_VERBOSE((10, ompi_osc_base_framework.framework_output, > - "isend_completion_cb called")); > - > - mark_outgoing_completion(module); > - > - /* put this request on the garbage colletion list */ > - osc_pt2pt_gc_add_request (module, request); > - > - return OMPI_SUCCESS; > -} > - > - > -int ompi_osc_pt2pt_component_isend (ompi_osc_pt2pt_module_t *module, > const void *buf, > - size_t count, struct ompi_datatype_t > *datatype, > - int dest, int tag, struct > ompi_communicator_t *comm) > -{ > - return ompi_osc_pt2pt_isend_w_cb (buf, count, datatype, dest, tag, > comm, > - isend_completion_cb, module); > -} > - > int ompi_osc_pt2pt_isend_w_cb (const void *ptr, int count, > ompi_datatype_t *datatype, int target, int tag, > ompi_communicator_t *comm, > ompi_request_complete_fn_t cb, void *ctx) > { > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > index 515ce82..f55e6cb 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_frag.h > @@ -1,7 +1,7 @@ > /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ > /* > * Copyright (c) 2012 Sandia National Laboratories. All rights > reserved. > - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights > * reserved. > * $COPYRIGHT$ > * > @@ -33,7 +33,8 @@ struct ompi_osc_pt2pt_frag_t { > char *top; > > /* Number of operations which have started writing into the frag, but > not yet completed doing so */ > - int32_t pending; > + volatile int32_t pending; > + int32_t pending_long_sends; > ompi_osc_pt2pt_frag_header_t *header; > ompi_osc_pt2pt_module_t *module; > }; > @@ -44,12 +45,24 @@ extern int > ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_p > extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t > *module, int target); > extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module); > > +static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t > *module, > + ompi_osc_pt2pt_frag_t* > buffer) > +{ > + opal_atomic_wmb (); > + if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { > + opal_atomic_mb (); > + return ompi_osc_pt2pt_frag_start(module, buffer); > + } > + > + return OMPI_SUCCESS; > +} > + > /* > * Note: module lock must be held during this operation > */ > static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t > *module, int target, > size_t request_len, > ompi_osc_pt2pt_frag_t **buffer, > - char **ptr) > + char **ptr, bool long_send) > { > ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, > target); > ompi_osc_pt2pt_frag_t *curr; > @@ -66,29 +79,21 @@ static inline int ompi_osc_pt2pt_frag_alloc > (ompi_osc_pt2pt_module_t *module, in > > OPAL_THREAD_LOCK(&module->lock); > curr = peer->active_frag; > - if (NULL == curr || curr->remain_len < request_len) { > - opal_free_list_item_t *item = NULL; > - > - if (NULL != curr) { > - curr->remain_len = 0; > - peer->active_frag = NULL; > - opal_atomic_mb (); > - > + if (NULL == curr || curr->remain_len < request_len || (long_send && > curr->pending_long_sends == 32)) { > + if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, > NULL)) { > /* If there's something pending, the pending finish will > start the buffer. Otherwise, we need to start it now. */ > - if (0 == OPAL_THREAD_ADD32(&curr->pending, -1)) { > - ret = ompi_osc_pt2pt_frag_start(module, curr); > - if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > - return ret; > - } > + ret = ompi_osc_pt2pt_frag_finish (module, curr); > + if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) { > + OPAL_THREAD_UNLOCK(&module->lock); > + return ret; > } > } > > - item = opal_free_list_get (&mca_osc_pt2pt_component.frags); > - if (OPAL_UNLIKELY(NULL == item)) { > + curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get > (&mca_osc_pt2pt_component.frags); > + if (OPAL_UNLIKELY(NULL == curr)) { > return OMPI_ERR_OUT_OF_RESOURCE; > } > - curr = peer->active_frag = (ompi_osc_pt2pt_frag_t*) item; > > curr->target = target; > > @@ -96,7 +101,8 @@ static inline int ompi_osc_pt2pt_frag_alloc > (ompi_osc_pt2pt_module_t *module, in > curr->top = (char*) (curr->header + 1); > curr->remain_len = mca_osc_pt2pt_component.buffer_size; > curr->module = module; > - curr->pending = 1; > + curr->pending = 2; > + curr->pending_long_sends = long_send; > > curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG; > curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID; > @@ -104,12 +110,18 @@ static inline int ompi_osc_pt2pt_frag_alloc > (ompi_osc_pt2pt_module_t *module, in > curr->header->base.flags |= > OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET; > } > curr->header->source = ompi_comm_rank(module->comm); > - curr->header->num_ops = 0; > + curr->header->num_ops = 1; > > if (curr->remain_len < request_len) { > OPAL_THREAD_UNLOCK(&module->lock); > return OMPI_ERR_TEMP_OUT_OF_RESOURCE; > } > + > + peer->active_frag = curr; > + } else { > + OPAL_THREAD_ADD32(&curr->pending, 1); > + OPAL_THREAD_ADD32(&curr->header->num_ops, 1); > + curr->pending_long_sends += long_send; > } > > *ptr = curr->top; > @@ -117,24 +129,8 @@ static inline int ompi_osc_pt2pt_frag_alloc > (ompi_osc_pt2pt_module_t *module, in > > curr->top += request_len; > curr->remain_len -= request_len; > - OPAL_THREAD_UNLOCK(&module->lock); > > - OPAL_THREAD_ADD32(&curr->pending, 1); > - OPAL_THREAD_ADD32(&curr->header->num_ops, 1); > - > - return OMPI_SUCCESS; > -} > - > - > -/* > - * Note: module lock must be held for this operation > - */ > -static inline int ompi_osc_pt2pt_frag_finish(ompi_osc_pt2pt_module_t > *module, > - ompi_osc_pt2pt_frag_t* buffer) > -{ > - if (0 == OPAL_THREAD_ADD32(&buffer->pending, -1)) { > - return ompi_osc_pt2pt_frag_start(module, buffer); > - } > + OPAL_THREAD_UNLOCK(&module->lock); > > return OMPI_SUCCESS; > } > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c > index eddccf5..6741036 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.c > @@ -51,6 +51,7 @@ request_construct(ompi_osc_pt2pt_request_t *request) > request->super.req_status._cancelled = 0; > request->super.req_free = request_free; > request->super.req_cancel = request_cancel; > + request->outstanding_requests = 0; > } > > OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_request_t, > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h > b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h > index 07b9d53..dee5c86 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h > @@ -1,7 +1,7 @@ > /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ > /* > * Copyright (c) 2012 Sandia National Laboratories. All rights > reserved. > - * Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights > * reserved. > * Copyright (c) 2015 Research Organization for Information Science > * and Technology (RIST). All rights reserved. > @@ -57,6 +57,7 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); > #define OMPI_OSC_PT2PT_REQUEST_RETURN(req) \ > do { \ > OMPI_REQUEST_FINI(&(req)->super); \ > + (req)->outstanding_requests = 0; \ > opal_free_list_return (&mca_osc_pt2pt_component.requests, \ > (opal_free_list_item_t *) (req)); \ > } while (0) > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h > b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h > index eee2964..f4e4adc 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_sync.h > @@ -1,6 +1,6 @@ > /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ > /* > - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights > + * Copyright (c) 2015-2016 Los Alamos National Security, LLC. All rights > * reserved. > * $COPYRIGHT$ > * > @@ -163,8 +163,10 @@ static inline void ompi_osc_pt2pt_sync_expected > (ompi_osc_pt2pt_sync_t *sync) > { > int32_t new_value = OPAL_THREAD_ADD32 (&sync->sync_expected, -1); > if (0 == new_value) { > + OPAL_THREAD_LOCK(&sync->lock); > sync->eager_send_active = true; > opal_condition_broadcast (&sync->cond); > + OPAL_THREAD_UNLOCK(&sync->lock); > } > } > > > > > https://github.com/open-mpi/ompi-release/commit/b1cb049a9d9a486c6e08bd2966b0033e30df7055 > > commit b1cb049a9d9a486c6e08bd2966b0033e30df7055 > Author: Gilles Gouaillardet <gil...@rist.or.jp> > Date: Tue Jan 5 16:57:37 2016 +0900 > > osc/pt2pt: use two distinct "namespaces" for tags > > (cherry picked from > open-mpi/ompi@06ecdb6aa7ee688f51de2b3ca05e9f0605a90099) > > Signed-off-by: Nathan Hjelm <hje...@lanl.gov> > > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h > b/ompi/mca/osc/pt2pt/osc_pt2pt.h > index 51b14b7..1f3c204 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h > @@ -150,6 +150,7 @@ struct ompi_osc_pt2pt_module_t { > > /** cyclic counter for a unique tage for long messages. */ > unsigned int tag_counter; > + unsigned int rtag_counter; > > /* Number of outgoing fragments that have completed since the > begining of time */ > @@ -659,11 +660,22 @@ static inline int get_tag(ompi_osc_pt2pt_module_t > *module) > completion). */ > int tmp = module->tag_counter + > !!(module->passive_target_access_epoch); > > - module->tag_counter = (module->tag_counter + 2) & OSC_PT2PT_FRAG_MASK; > + module->tag_counter = (module->tag_counter + 4) & OSC_PT2PT_FRAG_MASK; > > return tmp; > } > > +static inline int get_rtag(ompi_osc_pt2pt_module_t *module) > +{ > + /* the LSB of the tag is used be the receiver to determine if the > + message is a passive or active target (ie, where to mark > + completion). */ > + int tmp = module->rtag_counter + > !!(module->passive_target_access_epoch); > + > + module->rtag_counter = (module->rtag_counter + 4) & > OSC_PT2PT_FRAG_MASK; > + > + return tmp; > +} > /** > * ompi_osc_pt2pt_accumulate_lock: > * > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > index 5bb3a07..b22f783 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c > @@ -475,7 +475,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void > *origin_addr, int origin_count, > } > > is_long_msg = true; > - tag = get_tag (module); > + tag = get_rtag (module); > } > > /* flush will be called at the end of this function. make sure all > post messages have > diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c > b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c > index 6a8f53e..41bbe18 100644 > --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c > +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c > @@ -290,6 +290,7 @@ component_select(struct ompi_win_t *win, void **base, > size_t size, int disp_unit > /* fill in the function pointer part */ > memcpy(module, &ompi_osc_pt2pt_module_template, > sizeof(ompi_osc_base_module_t)); > + module->rtag_counter = 2; > > /* initialize the objects, so that always free in cleanup */ > OBJ_CONSTRUCT(&module->lock, opal_mutex_t); > > > ----------------------------------------------------------------------- > > Summary of changes: > ompi/mca/osc/pt2pt/osc_pt2pt.h | 36 +++--- > ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c | 5 +- > ompi/mca/osc/pt2pt/osc_pt2pt_comm.c | 170 > ++++++++++++-------------- > ompi/mca/osc/pt2pt/osc_pt2pt_component.c | 1 + > ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c | 31 +---- > ompi/mca/osc/pt2pt/osc_pt2pt_frag.h | 147 +++++++++++++--------- > ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c | 4 +- > ompi/mca/osc/pt2pt/osc_pt2pt_request.c | 1 + > ompi/mca/osc/pt2pt/osc_pt2pt_request.h | 3 +- > ompi/mca/osc/pt2pt/osc_pt2pt_sync.h | 4 +- > 10 files changed, 193 insertions(+), 209 deletions(-) > > > hooks/post-receive > -- > open-mpi/ompi-release > _______________________________________________ > ompi-commits mailing list > ompi-comm...@open-mpi.org > http://www.open-mpi.org/mailman/listinfo.cgi/ompi-commits >