Hi, Attached are two patches. First one implements new function mca_pml_ob1_send_requst_copy_in_out(req, offset, len) that sends given range of the request by copying data in/out internal buffers. It also changes the behaviour of the pipeline protocol to send data from an end of a user buffer. The second patch fixes deadlock by sending memory by copying if registration fails. It uses the facility introduced in the fist patch.
Comments are welcome. As always if there will be no complains I'll commit this to the trunk, so better complain before commit :) -- Gleb.
diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c index 690a512..cdb88d0 100644 --- a/ompi/mca/pml/ob1/pml_ob1.c +++ b/ompi/mca/pml/ob1/pml_ob1.c @@ -253,7 +253,8 @@ int mca_pml_ob1_send_fin( ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, void *hdr_des, - uint8_t order + uint8_t order, + uint32_t status ) { mca_btl_base_descriptor_t* fin; @@ -262,7 +263,7 @@ int mca_pml_ob1_send_fin( MCA_PML_OB1_DES_ALLOC(bml_btl, fin, order, sizeof(mca_pml_ob1_fin_hdr_t)); if(NULL == fin) { - MCA_PML_OB1_ADD_FIN_TO_PENDING(proc, hdr_des, bml_btl, order); + MCA_PML_OB1_ADD_FIN_TO_PENDING(proc, hdr_des, bml_btl, order, status); return OMPI_ERR_OUT_OF_RESOURCE; } fin->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -274,6 +275,7 @@ int mca_pml_ob1_send_fin( hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FIN; hdr->hdr_des.pval = hdr_des; + hdr->hdr_fail = status; #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #ifdef WORDS_BIGENDIAN @@ -296,12 +298,11 @@ int mca_pml_ob1_send_fin( MCA_BTL_TAG_PML ); if(OMPI_SUCCESS != rc) { - MCA_PML_OB1_ADD_FIN_TO_PENDING(proc, hdr_des, bml_btl, order); MCA_BML_BASE_BTL_DES_RETURN(bml_btl, fin); + MCA_PML_OB1_ADD_FIN_TO_PENDING(proc, hdr_des, bml_btl, order, status); return OMPI_ERR_OUT_OF_RESOURCE; } - return OMPI_SUCCESS; } @@ -352,7 +353,8 @@ void mca_pml_ob1_process_pending_packets(mca_bml_base_btl_t* bml_btl) case MCA_PML_OB1_HDR_TYPE_FIN: rc = mca_pml_ob1_send_fin(pckt->proc, send_dst, pckt->hdr.hdr_fin.hdr_des.pval, - pckt->order); + pckt->order, + pckt->hdr.hdr_fin.hdr_fail); MCA_PML_OB1_PCKT_PENDING_RETURN(pckt); if(OMPI_ERR_OUT_OF_RESOURCE == rc) return; @@ -378,6 +380,7 @@ void mca_pml_ob1_process_pending_rdma(void) if(NULL == frag) break; if(frag->rdma_state == MCA_PML_OB1_RDMA_PUT) { + frag->retries++; rc = mca_pml_ob1_send_request_put_frag(frag); } else { rc = mca_pml_ob1_recv_request_get_frag(frag); diff --git a/ompi/mca/pml/ob1/pml_ob1.h b/ompi/mca/pml/ob1/pml_ob1.h index bb371f5..50c8f31 100644 --- a/ompi/mca/pml/ob1/pml_ob1.h +++ b/ompi/mca/pml/ob1/pml_ob1.h @@ -54,6 +54,7 @@ struct mca_pml_ob1_t { size_t eager_limit; /* maximum eager limit size - overrides btl setting */ size_t send_pipeline_depth; size_t recv_pipeline_depth; + size_t rdma_put_retries_limit; bool leave_pinned; int leave_pinned_pipeline; @@ -266,7 +267,7 @@ do { \ (ompi_free_list_item_t*)pckt); \ } while(0) -#define MCA_PML_OB1_ADD_FIN_TO_PENDING(P, D, B, O) \ +#define MCA_PML_OB1_ADD_FIN_TO_PENDING(P, D, B, O, S) \ do { \ mca_pml_ob1_pckt_pending_t *_pckt; \ int _rc; \ @@ -274,6 +275,7 @@ do { \ MCA_PML_OB1_PCKT_PENDING_ALLOC(_pckt,_rc); \ _pckt->hdr.hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FIN; \ _pckt->hdr.hdr_fin.hdr_des.pval = (D); \ + _pckt->hdr.hdr_fin.hdr_fail = (S); \ _pckt->proc = (P); \ _pckt->bml_btl = (B); \ _pckt->order = (O); \ @@ -285,7 +287,7 @@ do { \ int mca_pml_ob1_send_fin(ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, - void *hdr_des, uint8_t order); + void *hdr_des, uint8_t order, uint32_t status); /* This function tries to resend FIN/ACK packets from pckt_pending queue. * Packets are added to the queue when sending of FIN or ACK is failed due to diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 198564d..d3237e5 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -111,7 +111,9 @@ int mca_pml_ob1_component_open(void) mca_pml_ob1_param_register_int("send_pipeline_depth", 3); mca_pml_ob1.recv_pipeline_depth = mca_pml_ob1_param_register_int("recv_pipeline_depth", 4); - + mca_pml_ob1.rdma_put_retries_limit = + mca_pml_ob1_param_register_int("rdma_put_retries_limit", 5); + mca_pml_ob1.unexpected_limit = mca_pml_ob1_param_register_int("unexpected_limit", 128); diff --git a/ompi/mca/pml/ob1/pml_ob1_hdr.h b/ompi/mca/pml/ob1/pml_ob1_hdr.h index 87eac7d..6d35581 100644 --- a/ompi/mca/pml/ob1/pml_ob1_hdr.h +++ b/ompi/mca/pml/ob1/pml_ob1_hdr.h @@ -219,6 +219,7 @@ struct mca_pml_ob1_fin_hdr_t { uint8_t hdr_padding[6]; #endif ompi_ptr_t hdr_des; /**< completed descriptor */ + uint32_t hdr_fail; /**< RDMA operation failed */ }; typedef struct mca_pml_ob1_fin_hdr_t mca_pml_ob1_fin_hdr_t; diff --git a/ompi/mca/pml/ob1/pml_ob1_rdmafrag.h b/ompi/mca/pml/ob1/pml_ob1_rdmafrag.h index c892c6b..1ff5ce5 100644 --- a/ompi/mca/pml/ob1/pml_ob1_rdmafrag.h +++ b/ompi/mca/pml/ob1/pml_ob1_rdmafrag.h @@ -45,6 +45,7 @@ struct mca_pml_ob1_rdma_frag_t { struct mca_bml_base_endpoint_t* rdma_ep; ompi_convertor_t convertor; mca_mpool_base_registration_t* reg; + uint32_t retries; }; typedef struct mca_pml_ob1_rdma_frag_t mca_pml_ob1_rdma_frag_t; diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 4fab266..d8405ae 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -182,7 +182,8 @@ void mca_pml_ob1_recv_frag_callback( } #endif rdma = (mca_btl_base_descriptor_t*)hdr->hdr_fin.hdr_des.pval; - rdma->des_cbfunc(btl, NULL, rdma, OMPI_SUCCESS); + rdma->des_cbfunc(btl, NULL, rdma, + hdr->hdr_fin.hdr_fail ? OMPI_ERROR : OMPI_SUCCESS); break; } default: diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index 94114af..9cf5661 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -157,8 +157,10 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, mca_pml_ob1_recv_request_t* recvreq = (mca_pml_ob1_recv_request_t*)des->des_cbdata; size_t bytes_received = 0; - MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( des->des_dst, des->des_dst_cnt, - 0, bytes_received ); + if(status == OMPI_SUCCESS) { + MCA_PML_OB1_COMPUTE_SEGMENT_LENGTH( des->des_dst, des->des_dst_cnt, + 0, bytes_received ); + } OPAL_THREAD_ADD_SIZE_T(&recvreq->req_pipeline_depth,-1); mca_bml_base_free(bml_btl, des); @@ -308,7 +310,7 @@ static void mca_pml_ob1_rget_completion( mca_pml_ob1_send_fin(recvreq->req_recv.req_base.req_proc, bml_btl, frag->rdma_hdr.hdr_rget.hdr_des.pval, - des->order); + des->order, 0); /* is receive request complete */ if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, frag->rdma_length) diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index 8c7a3d6..606ea61 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -1074,7 +1074,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, mca_pml_ob1_send_fin(sendreq->req_send.req_base.req_proc, bml_btl, frag->rdma_hdr.hdr_rdma.hdr_des.pval, - des->order); + des->order, 0); /* check for request completion */ if( OPAL_THREAD_ADD_SIZE_T(&sendreq->req_bytes_delivered, frag->rdma_length) @@ -1115,12 +1115,27 @@ int mca_pml_ob1_send_request_put_frag( mca_pml_ob1_rdma_frag_t* frag ) &des ); if(NULL == des) { - size_t offset = (size_t)frag->rdma_hdr.hdr_rdma.hdr_rdma_offset; - frag->rdma_length = save_size; - ompi_convertor_set_position(&frag->convertor, &offset); - OPAL_THREAD_LOCK(&mca_pml_ob1.lock); - opal_list_append(&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag); - OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock); + if(frag->retries < mca_pml_ob1.rdma_put_retries_limit) { + size_t offset = (size_t)frag->rdma_hdr.hdr_rdma.hdr_rdma_offset; + frag->rdma_length = save_size; + ompi_convertor_set_position(&frag->convertor, &offset); + OPAL_THREAD_LOCK(&mca_pml_ob1.lock); + opal_list_append(&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag); + OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock); + } else { + mca_pml_ob1_send_request_t *sendreq = + (mca_pml_ob1_send_request_t*)frag->rdma_req; + + /* tell receiver to unregister memory */ + mca_pml_ob1_send_fin(sendreq->req_send.req_base.req_proc, + frag->rdma_hdr.hdr_rdma.hdr_des.pval, bml_btl, + MCA_BTL_NO_ORDER, 1); + + /* send fragment by copy in/out */ + mca_pml_ob1_send_requst_copy_in_out(sendreq, + frag->rdma_hdr.hdr_rdma.hdr_rdma_offset, frag->rdma_length); + mca_pml_ob1_send_request_schedule(sendreq); + } return OMPI_ERR_OUT_OF_RESOURCE; } @@ -1194,6 +1209,7 @@ void mca_pml_ob1_send_request_put( frag->rdma_length = size; frag->rdma_state = MCA_PML_OB1_RDMA_PUT; frag->reg = NULL; + frag->retries = 0; /* lookup the corresponding registration */ for(i=0; i<sendreq->req_rdma_cnt; i++) {
diff --git a/ompi/mca/pml/ob1/pml_ob1.c b/ompi/mca/pml/ob1/pml_ob1.c index f08c500..690a512 100644 --- a/ompi/mca/pml/ob1/pml_ob1.c +++ b/ompi/mca/pml/ob1/pml_ob1.c @@ -339,13 +339,13 @@ void mca_pml_ob1_process_pending_packets(mca_bml_base_btl_t* bml_btl) send_dst, pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_dst_req.pval, - pckt->hdr.hdr_ack.hdr_rdma_offset); + pckt->hdr.hdr_ack.hdr_send_offset); MCA_PML_OB1_PCKT_PENDING_RETURN(pckt); if(OMPI_ERR_OUT_OF_RESOURCE == rc) { MCA_PML_OB1_ADD_ACK_TO_PENDING(pckt->proc, pckt->hdr.hdr_ack.hdr_src_req.lval, pckt->hdr.hdr_ack.hdr_dst_req.pval, - pckt->hdr.hdr_ack.hdr_rdma_offset); + pckt->hdr.hdr_ack.hdr_send_offset); return; } break; diff --git a/ompi/mca/pml/ob1/pml_ob1.h b/ompi/mca/pml/ob1/pml_ob1.h index 49c3f3f..bb371f5 100644 --- a/ompi/mca/pml/ob1/pml_ob1.h +++ b/ompi/mca/pml/ob1/pml_ob1.h @@ -67,6 +67,7 @@ struct mca_pml_ob1_t { ompi_free_list_t recv_frags; ompi_free_list_t pending_pckts; ompi_free_list_t buffers; + ompi_free_list_t send_ranges; /* list of pending operations */ opal_list_t pckt_pending; diff --git a/ompi/mca/pml/ob1/pml_ob1_component.c b/ompi/mca/pml/ob1/pml_ob1_component.c index 1e3ccdd..198564d 100644 --- a/ompi/mca/pml/ob1/pml_ob1_component.c +++ b/ompi/mca/pml/ob1/pml_ob1_component.c @@ -197,7 +197,16 @@ int mca_pml_ob1_component_open(void) OBJ_CONSTRUCT(&mca_pml_ob1.buffers, ompi_free_list_t); - + OBJ_CONSTRUCT(&mca_pml_ob1.send_ranges, ompi_free_list_t); + ompi_free_list_init( + &mca_pml_ob1.send_ranges, + sizeof(mca_pml_ob1_send_range_t), + OBJ_CLASS(mca_pml_ob1_send_range_t), + mca_pml_ob1.free_list_num, + mca_pml_ob1.free_list_max, + mca_pml_ob1.free_list_inc, + NULL); + /* pending operations */ OBJ_CONSTRUCT(&mca_pml_ob1.send_pending, opal_list_t); OBJ_CONSTRUCT(&mca_pml_ob1.recv_pending, opal_list_t); diff --git a/ompi/mca/pml/ob1/pml_ob1_hdr.h b/ompi/mca/pml/ob1/pml_ob1_hdr.h index 12c9f25..87eac7d 100644 --- a/ompi/mca/pml/ob1/pml_ob1_hdr.h +++ b/ompi/mca/pml/ob1/pml_ob1_hdr.h @@ -172,7 +172,7 @@ struct mca_pml_ob1_ack_hdr_t { #endif ompi_ptr_t hdr_src_req; /**< source request */ ompi_ptr_t hdr_dst_req; /**< matched receive request */ - uint64_t hdr_rdma_offset; /**< starting point rdma protocol */ + uint64_t hdr_send_offset; /**< starting point of copy in/out */ }; typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; @@ -183,13 +183,13 @@ typedef struct mca_pml_ob1_ack_hdr_t mca_pml_ob1_ack_hdr_t; #define MCA_PML_OB1_ACK_HDR_NTOH(h) \ do { \ MCA_PML_OB1_COMMON_HDR_NTOH(h.hdr_common); \ - (h).hdr_rdma_offset = ntoh64((h).hdr_rdma_offset); \ + (h).hdr_send_offset = ntoh64((h).hdr_send_offset); \ } while (0) #define MCA_PML_OB1_ACK_HDR_HTON(h) \ do { \ MCA_PML_OB1_COMMON_HDR_HTON((h).hdr_common); \ - (h).hdr_rdma_offset = hton64((h).hdr_rdma_offset); \ + (h).hdr_send_offset = hton64((h).hdr_send_offset); \ } while (0) /** diff --git a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c index 7505019..4fab266 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvfrag.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvfrag.c @@ -128,7 +128,16 @@ void mca_pml_ob1_recv_frag_callback( #endif sendreq = (mca_pml_ob1_send_request_t*)hdr->hdr_ack.hdr_src_req.pval; sendreq->req_recv = hdr->hdr_ack.hdr_dst_req; - sendreq->req_rdma_offset = (size_t)hdr->hdr_ack.hdr_rdma_offset; + + /* if the request should be delivered entirely by copy in/out + * then throttle sends */ + if(sendreq->req_bytes_delivered == hdr->hdr_ack.hdr_send_offset) + sendreq->req_throttle_sends = true; + + mca_pml_ob1_send_requst_copy_in_out(sendreq, + hdr->hdr_ack.hdr_send_offset, + sendreq->req_send.req_bytes_packed - + hdr->hdr_ack.hdr_send_offset); if(OPAL_THREAD_ADD32(&sendreq->req_state, 1) == 2 && sendreq->req_bytes_delivered >= sendreq->req_send.req_bytes_packed) { diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index ba450f8..94114af 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -166,7 +166,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); - } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -179,7 +179,7 @@ static void mca_pml_ob1_put_completion( mca_btl_base_module_t* btl, int mca_pml_ob1_recv_request_ack_send_btl( ompi_proc_t* proc, mca_bml_base_btl_t* bml_btl, - uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) + uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset) { mca_btl_base_descriptor_t* des; mca_pml_ob1_ack_hdr_t* ack; @@ -197,7 +197,7 @@ int mca_pml_ob1_recv_request_ack_send_btl( ack->hdr_common.hdr_flags = 0; ack->hdr_src_req.lval = hdr_src_req; ack->hdr_dst_req.pval = hdr_dst_req; - ack->hdr_rdma_offset = hdr_rdma_offset; + ack->hdr_send_offset = hdr_send_offset; #if OMPI_ENABLE_HETEROGENEOUS_SUPPORT #ifdef WORDS_BIGENDIAN @@ -235,11 +235,9 @@ static int mca_pml_ob1_recv_request_ack( bml_endpoint = (mca_bml_base_endpoint_t*) proc->proc_bml; - /* by default copy */ - recvreq->req_rdma_offset = hdr->hdr_msg_length; + /* by default copy everything */ + recvreq->req_send_offset = bytes_received; if(hdr->hdr_msg_length > bytes_received) { - - /* * lookup request buffer to determine if memory is already * registered. @@ -259,29 +257,30 @@ static int mca_pml_ob1_recv_request_ack( if (hdr->hdr_match.hdr_common.hdr_flags & MCA_PML_OB1_HDR_FLAGS_PIN && recvreq->req_rdma_cnt != 0) { - recvreq->req_rdma_offset = bytes_received; + recvreq->req_send_offset = hdr->hdr_msg_length; /* are rdma devices available for long rdma protocol */ } else if (bml_endpoint->btl_send_limit < hdr->hdr_msg_length && bml_endpoint->btl_rdma_offset < hdr->hdr_msg_length && mca_bml_base_btl_array_get_size(&bml_endpoint->btl_rdma)) { /* use convertor to figure out the rdma offset for this request */ - recvreq->req_rdma_offset = bml_endpoint->btl_rdma_offset; - if(recvreq->req_rdma_offset < bytes_received) { - recvreq->req_rdma_offset = bytes_received; + recvreq->req_send_offset = hdr->hdr_msg_length - + bml_endpoint->btl_rdma_offset; + if(recvreq->req_send_offset < bytes_received) { + recvreq->req_send_offset = bytes_received; } ompi_convertor_set_position( &recvreq->req_recv.req_convertor, - &recvreq->req_rdma_offset ); + &recvreq->req_send_offset ); } } - /* start rdma at current fragment offset - no need to ack */ - if(recvreq->req_rdma_offset == bytes_received) + /* nothing to send by copy in/out - no need to ack */ + if(recvreq->req_send_offset == hdr->hdr_msg_length) return OMPI_SUCCESS; } /* let know to shedule function there is no need to put ACK flag */ recvreq->req_ack_sent = true; return mca_pml_ob1_recv_request_ack_send(proc, hdr->hdr_src_req.lval, - recvreq, recvreq->req_rdma_offset); + recvreq, recvreq->req_send_offset); } @@ -473,6 +472,7 @@ void mca_pml_ob1_recv_request_progress( bytes_received -= sizeof(mca_pml_ob1_rendezvous_hdr_t); recvreq->req_recv.req_bytes_packed = hdr->hdr_rndv.hdr_msg_length; recvreq->req_send = hdr->hdr_rndv.hdr_src_req; + recvreq->req_rdma_offset = bytes_received; MCA_PML_OB1_RECV_REQUEST_MATCHED(recvreq,&hdr->hdr_match); mca_pml_ob1_recv_request_ack(recvreq, &hdr->hdr_rndv, bytes_received); if(recvreq->req_recv.req_base.req_pml_complete) { @@ -537,7 +537,7 @@ void mca_pml_ob1_recv_request_progress( if( OPAL_THREAD_ADD_SIZE_T(&recvreq->req_bytes_received, bytes_received) >= recvreq->req_recv.req_bytes_packed ) { MCA_PML_OB1_RECV_REQUEST_PML_COMPLETE( recvreq ); - } else if (recvreq->req_rdma_offset < recvreq->req_recv.req_bytes_packed) { + } else if (recvreq->req_rdma_offset < recvreq->req_send_offset) { /* schedule additional rdma operations */ mca_pml_ob1_recv_request_schedule(recvreq); } @@ -599,7 +599,7 @@ int mca_pml_ob1_recv_request_schedule_exclusive( mca_pml_ob1_recv_request_t* rec num_tries = recvreq->req_rdma_cnt; do { - size_t bytes_remaining = recvreq->req_recv.req_bytes_packed - + size_t bytes_remaining = recvreq->req_send_offset - recvreq->req_rdma_offset; size_t prev_bytes_remaining = 0; int num_fail = 0; diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.h b/ompi/mca/pml/ob1/pml_ob1_recvreq.h index c9e44ca..0928e02 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.h @@ -47,6 +47,7 @@ struct mca_pml_ob1_recv_request_t { size_t req_bytes_received; size_t req_bytes_delivered; size_t req_rdma_offset; + size_t req_send_offset; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; uint32_t req_rdma_cnt; uint32_t req_rdma_idx; @@ -350,7 +351,7 @@ static inline void mca_pml_ob1_recv_request_schedule( _pckt->hdr.hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_ACK; \ _pckt->hdr.hdr_ack.hdr_src_req.lval = (S); \ _pckt->hdr.hdr_ack.hdr_dst_req.pval = (D); \ - _pckt->hdr.hdr_ack.hdr_rdma_offset = (O); \ + _pckt->hdr.hdr_ack.hdr_send_offset = (O); \ _pckt->proc = (P); \ _pckt->bml_btl = NULL; \ OPAL_THREAD_LOCK(&mca_pml_ob1.lock); \ @@ -364,7 +365,7 @@ int mca_pml_ob1_recv_request_ack_send_btl(ompi_proc_t* proc, uint64_t hdr_rdma_offset); static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc, - uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_rdma_offset) + uint64_t hdr_src_req, void *hdr_dst_req, uint64_t hdr_send_offset) { size_t i; mca_bml_base_btl_t* bml_btl; @@ -374,12 +375,12 @@ static inline int mca_pml_ob1_recv_request_ack_send(ompi_proc_t* proc, for(i = 0; i < mca_bml_base_btl_array_get_size(&endpoint->btl_eager); i++) { bml_btl = mca_bml_base_btl_array_get_next(&endpoint->btl_eager); if(mca_pml_ob1_recv_request_ack_send_btl(proc, bml_btl, hdr_src_req, - hdr_dst_req, hdr_rdma_offset) == OMPI_SUCCESS) + hdr_dst_req, hdr_send_offset) == OMPI_SUCCESS) return OMPI_SUCCESS; } MCA_PML_OB1_ADD_ACK_TO_PENDING(proc, hdr_src_req, hdr_dst_req, - hdr_rdma_offset); + hdr_send_offset); return OMPI_ERR_OUT_OF_RESOURCE; } diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index 1bd6487..8c7a3d6 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -33,6 +33,8 @@ #include "ompi/mca/bml/base/base.h" #include "ompi/datatype/dt_arch.h" +OBJ_CLASS_INSTANCE(mca_pml_ob1_send_range_t, ompi_free_list_item_t, + NULL, NULL); void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl) { @@ -131,6 +133,8 @@ static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req) req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; req->req_rdma_cnt = 0; + req->req_throttle_sends = false; + OBJ_CONSTRUCT(&req->req_send_ranges, opal_list_t); } OBJ_CLASS_INSTANCE( @@ -435,7 +439,6 @@ int mca_pml_ob1_send_request_start_buffered( /* update lengths */ segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t) + max_data; - sendreq->req_send_offset = max_data; descriptor->des_cbfunc = mca_pml_ob1_rndv_completion; descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -448,8 +451,8 @@ int mca_pml_ob1_send_request_start_buffered( return rc; } - iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + sendreq->req_send_offset); - iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - sendreq->req_send_offset; + iov.iov_base = (IOVBASE_TYPE*)(((unsigned char*)sendreq->req_send.req_addr) + max_data); + iov.iov_len = max_data = sendreq->req_send.req_bytes_packed - max_data; if((rc = ompi_convertor_pack( &sendreq->req_send.req_convertor, &iov, @@ -549,8 +552,6 @@ int mca_pml_ob1_send_request_start_copy( mca_pml_ob1_send_request_t* sendreq, /* update lengths */ segment->seg_len = sizeof(mca_pml_ob1_match_hdr_t) + max_data; - sendreq->req_send_offset = max_data; - sendreq->req_rdma_offset = max_data; /* short message */ descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; @@ -625,10 +626,6 @@ int mca_pml_ob1_send_request_start_prepare( mca_pml_ob1_send_request_t* sendreq, /* short message */ descriptor->des_cbfunc = mca_pml_ob1_match_completion_free; - /* update lengths */ - sendreq->req_send_offset = size; - sendreq->req_rdma_offset = size; - descriptor->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; descriptor->des_cbdata = sendreq; @@ -786,7 +783,6 @@ int mca_pml_ob1_send_request_start_rdma( /* update lengths with number of bytes actually packed */ segment->seg_len = sizeof(mca_pml_ob1_rendezvous_hdr_t); - sendreq->req_rdma_offset = 0; /* first fragment of a long message */ des->des_cbfunc = mca_pml_ob1_rndv_completion; @@ -873,8 +869,6 @@ int mca_pml_ob1_send_request_start_rndv( des->des_flags |= MCA_BTL_DES_FLAGS_PRIORITY; des->des_cbdata = sendreq; des->des_cbfunc = mca_pml_ob1_rndv_completion; - sendreq->req_send_offset = size; - sendreq->req_rdma_offset = size; /* send */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); @@ -884,6 +878,26 @@ int mca_pml_ob1_send_request_start_rndv( return rc; } +void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq, + uint64_t send_offset, uint64_t send_length) +{ + mca_pml_ob1_send_range_t *sr; + ompi_free_list_item_t *i; + int rc = OMPI_SUCCESS; + + if(0 == send_length) + return; + + OMPI_FREE_LIST_WAIT(&mca_pml_ob1.send_ranges, i, rc); + + sr = (mca_pml_ob1_send_range_t*)i; + + sr->range_send_offset = send_offset; + sr->range_send_length = send_length; + OPAL_THREAD_LOCK(&sendreq->req_send_range_lock); + opal_list_append(&sendreq->req_send_ranges, (opal_list_item_t*)sr); + OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock); +} /** * Schedule pipeline of send descriptors for the given request. @@ -901,33 +915,47 @@ int mca_pml_ob1_send_request_schedule_exclusive( mca_bml_base_btl_array_get_size(&bml_endpoint->btl_send); do { - /* allocate remaining bytes to BTLs */ - size_t bytes_remaining = sendreq->req_rdma_offset - - sendreq->req_send_offset; size_t prev_bytes_remaining = 0, num_fail = 0; + mca_pml_ob1_send_range_t *range = NULL; - if(bytes_remaining == 0) { - OPAL_THREAD_ADD32(&sendreq->req_lock, -sendreq->req_lock); - return OMPI_SUCCESS; - } - while((int32_t)bytes_remaining > 0 && - (sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth - || - sendreq->req_rdma_offset < sendreq->req_send.req_bytes_packed)) - { + while(true) { mca_pml_ob1_frag_hdr_t* hdr; mca_btl_base_descriptor_t* des; int rc; - size_t size; + size_t size; + opal_list_item_t *item; mca_bml_base_btl_t* bml_btl = - mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + mca_bml_base_btl_array_get_next(&bml_endpoint->btl_send); + + if(NULL == range || 0 == range->range_send_length) { + OPAL_THREAD_LOCK(&sendreq->req_send_range_lock); + if(range) { + opal_list_remove_first(&sendreq->req_send_ranges); + OMPI_FREE_LIST_RETURN(&mca_pml_ob1.send_ranges, + &range->base); + } + + item = opal_list_get_first(&sendreq->req_send_ranges); + OPAL_THREAD_UNLOCK(&sendreq->req_send_range_lock); + + if(opal_list_get_end(&sendreq->req_send_ranges) == item) + break; + + range = (mca_pml_ob1_send_range_t*)item; + prev_bytes_remaining = 0; + } - if(prev_bytes_remaining == bytes_remaining) + if(true == sendreq->req_throttle_sends && + sendreq->req_pipeline_depth >= + mca_pml_ob1.send_pipeline_depth) + break; + + if(prev_bytes_remaining == range->range_send_length) num_fail++; else num_fail = 0; - prev_bytes_remaining = bytes_remaining; + prev_bytes_remaining = range->range_send_length; if (num_fail == num_btl_avail) { assert(sendreq->req_pending == MCA_PML_OB1_SEND_PENDING_NONE); @@ -940,15 +968,15 @@ int mca_pml_ob1_send_request_schedule_exclusive( } if(num_btl_avail == 1 || - bytes_remaining < bml_btl->btl_min_send_size) { - size = bytes_remaining; + range->range_send_length < bml_btl->btl_min_send_size) { + size = range->range_send_length; } else { /* otherwise attempt to give the BTL a percentage of the message * based on a weighting factor. for simplicity calculate this as * a percentage of the overall message length (regardless of * amount previously assigned) */ - size = (size_t)(bml_btl->btl_weight * bytes_remaining); + size = (size_t)(bml_btl->btl_weight * range->range_send_length); } /* makes sure that we don't exceed BTL max send size */ @@ -959,7 +987,7 @@ int mca_pml_ob1_send_request_schedule_exclusive( /* pack into a descriptor */ ompi_convertor_set_position(&sendreq->req_send.req_convertor, - &sendreq->req_send_offset); + &range->range_send_offset); mca_bml_base_prepare_src(bml_btl, NULL, &sendreq->req_send.req_convertor, @@ -975,7 +1003,7 @@ int mca_pml_ob1_send_request_schedule_exclusive( hdr = (mca_pml_ob1_frag_hdr_t*)des->des_src->seg_addr.pval; hdr->hdr_common.hdr_flags = 0; hdr->hdr_common.hdr_type = MCA_PML_OB1_HDR_TYPE_FRAG; - hdr->hdr_frag_offset = sendreq->req_send_offset; + hdr->hdr_frag_offset = range->range_send_offset; hdr->hdr_src_req.pval = sendreq; hdr->hdr_dst_req = sendreq->req_recv; @@ -996,25 +1024,18 @@ int mca_pml_ob1_send_request_schedule_exclusive( #endif #endif - /* - * The if-clause should be optimized away, in case the macro - * extends to ; - */ #if OMPI_WANT_PERUSE - if( 0 != sendreq->req_send_offset ) { - PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE, - &(sendreq->req_send.req_base), - size, PERUSE_SEND); - } + PERUSE_TRACE_COMM_OMPI_EVENT(PERUSE_COMM_REQ_XFER_CONTINUE, + &(sendreq->req_send.req_base), size, PERUSE_SEND); #endif /* OMPI_WANT_PERUSE */ /* initiate send - note that this may complete before the call returns */ rc = mca_bml_base_send(bml_btl, des, MCA_BTL_TAG_PML); if(rc == OMPI_SUCCESS) { - bytes_remaining -= size; + range->range_send_length -= size; /* update state */ - sendreq->req_send_offset += size; + range->range_send_offset += size; OPAL_THREAD_ADD_SIZE_T(&sendreq->req_pipeline_depth, 1); } else { mca_bml_base_free(bml_btl,des); diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.h b/ompi/mca/pml/ob1/pml_ob1_sendreq.h index 7bd4e2a..75b7afc 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.h +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.h @@ -51,18 +51,26 @@ struct mca_pml_ob1_send_request_t { int32_t req_state; int32_t req_lock; #endif + bool req_throttle_sends; size_t req_pipeline_depth; size_t req_bytes_delivered; - size_t req_send_offset; - size_t req_rdma_offset; mca_pml_ob1_rdma_btl_t req_rdma[MCA_PML_OB1_MAX_RDMA_PER_REQUEST]; uint32_t req_rdma_cnt; mca_pml_ob1_send_pending_t req_pending; + opal_mutex_t req_send_range_lock; + opal_list_t req_send_ranges; }; typedef struct mca_pml_ob1_send_request_t mca_pml_ob1_send_request_t; OBJ_CLASS_DECLARATION(mca_pml_ob1_send_request_t); +struct mca_pml_ob1_send_range_t { + ompi_free_list_item_t base; + uint64_t range_send_offset; + uint64_t range_send_length; +}; +typedef struct mca_pml_ob1_send_range_t mca_pml_ob1_send_range_t; +OBJ_CLASS_DECLARATION(mca_pml_ob1_send_range_t); #define MCA_PML_OB1_SEND_REQUEST_ALLOC( \ comm, \ @@ -328,7 +336,6 @@ static inline int mca_pml_ob1_send_request_start( sendreq->req_lock = 0; sendreq->req_pipeline_depth = 0; sendreq->req_bytes_delivered = 0; - sendreq->req_send_offset = 0; sendreq->req_pending = MCA_PML_OB1_SEND_PENDING_NONE; sendreq->req_send.req_base.req_sequence = OPAL_THREAD_ADD32( &comm->procs[sendreq->req_send.req_base.req_peer].send_sequence,1); @@ -394,6 +401,8 @@ int mca_pml_ob1_send_request_put_frag(mca_pml_ob1_rdma_frag_t* frag); * should be considered for sending packets */ void mca_pml_ob1_send_request_process_pending(mca_bml_base_btl_t *bml_btl); +void mca_pml_ob1_send_requst_copy_in_out(mca_pml_ob1_send_request_t *sendreq, + uint64_t send_offset, uint64_t send_length); #if defined(c_plusplus) || defined(__cplusplus) } #endif