Applied all 5 patches. Thanks. -arlin
>-----Original Message----- >From: [email protected] >[mailto:[email protected]] On Behalf Of Sean Hefty >Sent: Friday, January 30, 2009 10:59 AM >To: Hefty, Sean; OpenIB; [email protected] >Subject: [ofw] [PATCH 5/5] [DAPL] dapl/ibal-scm: update >ibal-scm provider > >From: Stan Smith <[email protected]> > >Update the dapl.git tree with the latest SVN version of the >ibal-scm provider. > >Signed-off-by: Sean Hefty <[email protected]> >--- >Actual codng changes were made by Stan. I'm just submitting the patch >to update the DAPL git repository. > > dapl/ibal-scm/dapl_ibal-scm_cm.c | 775 >+++++++++++++++++++++++++----------- > dapl/ibal-scm/dapl_ibal-scm_util.c | 44 ++ > 2 files changed, 584 insertions(+), 235 deletions(-) > >diff --git a/dapl/ibal-scm/dapl_ibal-scm_cm.c >b/dapl/ibal-scm/dapl_ibal-scm_cm.c >index df83008..6a050b8 100644 >--- a/dapl/ibal-scm/dapl_ibal-scm_cm.c >+++ b/dapl/ibal-scm/dapl_ibal-scm_cm.c >@@ -63,6 +63,88 @@ > #include <ws2tcpip.h> > #include <io.h> > >+extern int g_scm_pipe[2]; >+ >+extern DAT_RETURN >+dapls_ib_query_gid( IN DAPL_HCA *hca_ptr, >+ IN GID *gid ); >+ >+ >+static struct ib_cm_handle * dapli_cm_create(void) >+{ >+ struct ib_cm_handle *cm_ptr; >+ >+ /* Allocate CM, init lock, and initialize */ >+ if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) >+ return NULL; >+ >+ if (dapl_os_lock_init(&cm_ptr->lock)) >+ goto bail; >+ >+ (void)dapl_os_memzero(cm_ptr, sizeof(*cm_ptr)); >+ cm_ptr->dst.ver = htons(DSCM_VER); >+ cm_ptr->socket = -1; >+ cm_ptr->l_socket = -1; >+ return cm_ptr; >+bail: >+ dapl_os_free(cm_ptr, sizeof(*cm_ptr)); >+ return NULL; >+} >+ >+ >+/* mark for destroy, remove all references, schedule cleanup */ >+ >+static void dapli_cm_destroy(struct ib_cm_handle *cm_ptr) >+{ >+ dapl_dbg_log(DAPL_DBG_TYPE_CM, >+ " cm_destroy: cm %p ep %p\n", cm_ptr,cm_ptr->ep); >+ >+ /* cleanup, never made it to work queue */ >+ if (cm_ptr->state == SCM_INIT) { >+ if (cm_ptr->socket >= 0) >+ closesocket(cm_ptr->socket); >+ if (cm_ptr->l_socket >= 0) >+ closesocket(cm_ptr->l_socket); >+ dapl_os_free(cm_ptr, sizeof(*cm_ptr)); >+ return; >+ } >+ >+ dapl_os_lock(&cm_ptr->lock); >+ cm_ptr->state = SCM_DESTROY; >+ if (cm_ptr->ep) >+ cm_ptr->ep->cm_handle = IB_INVALID_HANDLE; >+ >+ /* close socket if still active */ >+ if (cm_ptr->socket >= 0) { >+ closesocket(cm_ptr->socket); >+ cm_ptr->socket = -1; >+ } >+ if (cm_ptr->l_socket >= 0) { >+ closesocket(cm_ptr->l_socket); >+ cm_ptr->l_socket = -1; >+ } >+ dapl_os_unlock(&cm_ptr->lock); >+ >+ /* wakeup work thread */ >+ _write(g_scm_pipe[1], "w", sizeof "w"); >+} >+ >+ >+/* queue socket for processing CM work */ >+static void dapli_cm_queue(struct ib_cm_handle *cm_ptr) >+{ >+ /* add to work queue for cr thread processing */ >+ dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry); >+ dapl_os_lock( &cm_ptr->hca->ib_trans.lock ); >+ >dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca->ib_trans.list, >+ (DAPL_LLIST_ENTRY*)&cm_ptr->entry, >(void*)cm_ptr); >+ dapl_os_unlock(&cm_ptr->hca->ib_trans.lock); >+ >+ /* wakeup CM work thread */ >+ _write(g_scm_pipe[1], "w", sizeof "w"); >+} >+ >+ > > static uint16_t > dapli_get_lid(IN DAPL_HCA *hca, IN int port) >@@ -123,6 +205,263 @@ dapli_get_lid(IN DAPL_HCA *hca, IN int port) > > > /* >+ * ACTIVE/PASSIVE: called from CR thread or consumer via ep_disconnect >+ */ >+static DAT_RETURN >+dapli_socket_disconnect(dp_ib_cm_handle_t cm_ptr) >+{ >+ DAPL_EP *ep_ptr = cm_ptr->ep; >+ DAT_UINT32 disc_data = htonl(0xdead); >+ >+ if (ep_ptr == NULL) >+ return DAT_SUCCESS; >+ >+ dapl_os_lock(&cm_ptr->lock); >+ if ((cm_ptr->state == SCM_INIT) || >+ (cm_ptr->state == SCM_DISCONNECTED)) { >+ dapl_os_unlock(&cm_ptr->lock); >+ return DAT_SUCCESS; >+ } else { >+ /* send disc date, close socket, schedule destroy */ >+ if (cm_ptr->socket >= 0) { >+ send(cm_ptr->socket, (const char *)&disc_data, >+ sizeof(disc_data), 0); >+ closesocket(cm_ptr->socket); >+ cm_ptr->socket = -1; >+ } >+ cm_ptr->state = SCM_DISCONNECTED; >+ _write(g_scm_pipe[1], "w", sizeof "w"); >+ } >+ dapl_os_unlock(&cm_ptr->lock); >+ >+ >+ if (ep_ptr->cr_ptr) { >+ dapls_cr_callback(cm_ptr, >+ IB_CME_DISCONNECTED, >+ NULL, >+ ((DAPL_CR *)ep_ptr->cr_ptr)->sp_ptr); >+ } else { >+ dapl_evd_connection_callback(ep_ptr->cm_handle, >+ IB_CME_DISCONNECTED, >+ NULL, >+ ep_ptr); >+ } >+ >+ /* remove reference from endpoint */ >+ ep_ptr->cm_handle = NULL; >+ >+ /* schedule destroy */ >+ >+ >+ return DAT_SUCCESS; >+} >+ >+ >+ >+/* >+ * PASSIVE: consumer accept, send local QP information, private data, >+ * queue on work thread to receive RTU information to avoid blocking >+ * user thread. >+ */ >+static DAT_RETURN >+dapli_socket_accept_usr( DAPL_EP *ep_ptr, >+ DAPL_CR *cr_ptr, >+ DAT_COUNT p_size, >+ DAT_PVOID p_data ) >+{ >+ DAPL_IA *ia_ptr = ep_ptr->header.owner_ia; >+ dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle; >+ WSABUF iovec[2]; >+ int len, rc; >+ short rtu_data = 0; >+ ib_api_status_t ibs; >+ ib_qp_attr_t qpa; >+ dapl_ibal_port_t *p_port; >+ dapl_ibal_ca_t *p_ca; >+ >+ dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d >port 0x%x\n", >+ __FUNCTION__,p_size,cm_ptr->socket, >+ ia_ptr->hca_ptr->port_num); >+ >+ if (p_size > IB_MAX_REP_PDATA_SIZE) >+ return DAT_LENGTH_ERROR; >+ >+ /* must have a accepted socket */ >+ if ( cm_ptr->socket < 0 ) { >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, >+ "%s() Not accepted socket? remote >port=0x%x lid=0x%x" >+ " qpn=0x%x psize=%d\n", >+ cm_ptr->dst.port, cm_ptr->dst.lid, >+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); >+ return DAT_INTERNAL_ERROR; >+ } >+ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, >+ " accept_usr: remote port=0x%x lid=0x%x" >+ " qpn=0x%x psize=%d\n", >+ cm_ptr->dst.port, cm_ptr->dst.lid, >+ ntohs(cm_ptr->dst.qpn), cm_ptr->dst.p_size); >+ >+ /* modify QP to RTR and then to RTS with remote info >already read */ >+ >+ p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle; >+ p_port = dapli_ibal_get_port (p_ca, >(uint8_t)ia_ptr->hca_ptr->port_num); >+ if (!p_port) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "%s() dapli_ibal_get_port() failed @ >line #%d\n", >+ __FUNCTION__,__LINE__); >+ goto bail; >+ } >+ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, >+ "%s() DST: qpn 0x%x port 0x%x lid %x >psize %d\n", >+ __FUNCTION__, >+ cl_ntoh32(cm_ptr->dst.qpn), >+ cm_ptr->dst.port, >+ cl_ntoh16(cm_ptr->dst.lid), >cm_ptr->dst.p_size); >+ >+ /* modify QP to RTR and then to RTS with remote info */ >+ >+ ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, >+ cm_ptr->dst.qpn, >+ cm_ptr->dst.lid, >+ p_port ); >+ if (ibs != IB_SUCCESS) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "%s() QP --> RTR failed @ line #%d\n", >+ __FUNCTION__,__LINE__); >+ goto bail; >+ } >+ >+ if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) ) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "%s() QP --> RTS failed @ line #%d\n", >+ __FUNCTION__,__LINE__); >+ goto bail; >+ } >+ >+ ep_ptr->qp_state = IB_QP_STATE_RTS; >+ >+ /* save remote address information */ >+ dapl_os_memcpy( &ep_ptr->remote_ia_address, >+ &cm_ptr->dst.ia_address, >+ sizeof(ep_ptr->remote_ia_address)); >+ >+ /* determine QP & port numbers */ >+ ibs = ib_query_qp(ep_ptr->qp_handle, &qpa); >+ if (ibs != IB_SUCCESS) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ " ib_query_qp() ERR %s\n", >ib_get_err_str(ibs)); >+ goto bail; >+ } >+ >+ /* Send our QP info, IA address, and private data */ >+ cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */ >+ cm_ptr->dst.port = ia_ptr->hca_ptr->port_num; >+ cm_ptr->dst.lid = dapli_get_lid(ia_ptr->hca_ptr, >ia_ptr->hca_ptr->port_num); >+ /* set gid in network order */ >+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid ); >+ if ( ibs != IB_SUCCESS ) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "%s() dapls_ib_query_gid() returns '%s'\n", >+ __FUNCTION__,ib_get_err_str(ibs)); >+ goto bail; >+ } >+ >+ cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address; >+ cm_ptr->dst.p_size = p_size; >+ >+ dapl_dbg_log(DAPL_DBG_TYPE_CM, >+ "%s()\n Tx QP info: qpn %x port 0x%x lid 0x%x >p_sz %d IP %s\n", >+ __FUNCTION__, cl_ntoh32(cm_ptr->dst.qpn), >cm_ptr->dst.port, >+ cl_ntoh16(cm_ptr->dst.lid), cm_ptr->dst.p_size, >+ dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL)); >+ >+ /* network byte-ordering - QPN & LID already are */ >+ cm_ptr->dst.p_size = cl_hton32(cm_ptr->dst.p_size); >+ cm_ptr->dst.port = cl_hton16(cm_ptr->dst.port); >+ >+ iovec[0].buf = (char*)&cm_ptr->dst; >+ iovec[0].len = sizeof(ib_qp_cm_t); >+ if (p_size) { >+ iovec[1].buf = p_data; >+ iovec[1].len = p_size; >+ } >+ rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), >&len, 0, 0, 0 ); >+ if (rc || len != (p_size + sizeof(ib_qp_cm_t))) { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ " accept_usr: ERR %d, wcnt=%d\n", >+ WSAGetLastError(), len); >+ goto bail; >+ } >+ dapl_dbg_log(DAPL_DBG_TYPE_CM, >+ " accept_usr: local port=0x%x lid=0x%x" >+ " qpn=0x%x psize=%d\n", >+ ntohs(cm_ptr->dst.port), ntohs(cm_ptr->dst.lid), >+ ntohl(cm_ptr->dst.qpn), >ntohl(cm_ptr->dst.p_size)); >+ >+ /* save state and reference to EP, queue for RTU data */ >+ cm_ptr->ep = ep_ptr; >+ cm_ptr->hca = ia_ptr->hca_ptr; >+ cm_ptr->state = SCM_ACCEPTED; >+ >+ /* restore remote address information for query */ >+ dapl_os_memcpy( &cm_ptr->dst.ia_address, >+ &ep_ptr->remote_ia_address, >+ sizeof(cm_ptr->dst.ia_address)); >+ >+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: accepted!\n" ); >+ dapli_cm_queue(cm_ptr); >+ >+ return DAT_SUCCESS; >+ >+bail: >+ dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_usr: ERR >!QP_RTR_RTS \n"); >+ dapli_cm_destroy(cm_ptr); >+ dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */ >+ >+ return DAT_INTERNAL_ERROR; >+} >+ >+ >+/* >+ * PASSIVE: read RTU from active peer, post CONN event >+ */ >+void >+dapli_socket_accept_rtu(dp_ib_cm_handle_t cm_ptr) >+{ >+ int len; >+ short rtu_data = 0; >+ >+ /* complete handshake after final QP state change */ >+ len = recv(cm_ptr->socket, (char*)&rtu_data, >sizeof(rtu_data), 0); >+ if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ " accept_rtu: ERR %d, rcnt=%d rdata=%x\n", >+ WSAGetLastError(), len, ntohs(rtu_data) ); >+ goto bail; >+ } >+ >+ /* save state and reference to EP, queue for disc event */ >+ cm_ptr->state = SCM_CONNECTED; >+ >+ /* final data exchange if remote QP state is good to go */ >+ dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); >+ dapls_cr_callback(cm_ptr, IB_CME_CONNECTED, NULL, cm_ptr->sp); >+ return; >+bail: >+ dapls_ib_reinit_ep(cm_ptr->ep); /* reset QP state */ >+ dapli_cm_destroy(cm_ptr); >+ dapls_cr_callback(cm_ptr, IB_CME_DESTINATION_REJECT, >NULL, cm_ptr->sp); >+} >+ >+ >+/* > * ACTIVE: Create socket, connect, and exchange QP information > */ > static DAT_RETURN >@@ -143,21 +482,16 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > dapl_ibal_port_t *p_port; > dapl_ibal_ca_t *p_ca; > >- dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual >%d\n", r_qual); >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: r_qual %d >psize %d\n", >+ r_qual, p_size); > >- /* >- * Allocate CM and initialize >- */ >- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL ) { >+ cm_ptr = dapli_cm_create(); >+ if (cm_ptr == NULL) > return DAT_INSUFFICIENT_RESOURCES; >- } >- >- (void) dapl_os_memzero( cm_ptr, sizeof(*cm_ptr) ); >- cm_ptr->socket = -1; > > /* create, connect, sockopt, and exchange QP information */ > if ((cm_ptr->socket = socket(AF_INET,SOCK_STREAM,0)) < 0 ) { >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); >+ dapli_cm_destroy(cm_ptr); > return DAT_INSUFFICIENT_RESOURCES; > } > >@@ -166,7 +500,7 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > if (connect(cm_ptr->socket, r_addr, sizeof(*r_addr)) >== SOCKET_ERROR) { > dapl_dbg_log(DAPL_DBG_TYPE_ERR, " connect: %d >on r_qual %d\n", > WSAGetLastError(), (unsigned int)r_qual); >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); >+ dapli_cm_destroy(cm_ptr); > return DAT_INVALID_ADDRESS; > } > >@@ -175,6 +509,8 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > (const char*)&opt, > sizeof(opt) ); > >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " socket connected!\n"); >+ > /* determine QP & port numbers */ > ibs = ib_query_qp(ep_ptr->qp_handle, &qpa); > if (ibs != IB_SUCCESS) >@@ -187,7 +523,6 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > /* Send QP info, IA address and private data */ > cm_ptr->dst.qpn = qpa.num; /* ib_net32_t */ > cm_ptr->dst.port = cl_hton16(ia_ptr->hca_ptr->port_num); >- > cm_ptr->dst.lid = dapli_get_lid( ia_ptr->hca_ptr, > ia_ptr->hca_ptr->port_num ); > if (cm_ptr->dst.lid == 0) >@@ -197,6 +532,17 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > __FUNCTION__, __LINE__); > goto bail; > } >+ >+ /* set gid in network order */ >+ ibs = dapls_ib_query_gid( ia_ptr->hca_ptr, &cm_ptr->dst.gid ); >+ if ( ibs != IB_SUCCESS ) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "%s() dapls_ib_query_gid() returns '%s'\n", >+ __FUNCTION__,ib_get_err_str(ibs)); >+ goto bail; >+ } >+ > cm_ptr->dst.ia_address = ia_ptr->hca_ptr->hca_address; > cm_ptr->dst.p_size = cl_hton32(p_size); > >@@ -213,6 +559,8 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > iovec[1].buf = p_data; > iovec[1].len = p_size; > } >+ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, >write QP and private data\n"); > rc = WSASend (cm_ptr->socket, iovec, (p_size ? 2:1), >&len, 0, 0, NULL); > if ( rc || len != (p_size + sizeof(ib_qp_cm_t)) ) { > dapl_dbg_log(DAPL_DBG_TYPE_ERR, >@@ -225,17 +573,65 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > cm_ptr->dst.port, cm_ptr->dst.lid, > cm_ptr->dst.qpn, cm_ptr->dst.p_size ); > >+ /* queue up to work thread to avoid blocking consumer */ >+ cm_ptr->state = SCM_CONN_PENDING; >+ cm_ptr->hca = ia_ptr->hca_ptr; >+ cm_ptr->ep = ep_ptr; >+ dapli_cm_queue(cm_ptr); >+ return DAT_SUCCESS; >+bail: >+ /* close socket, free cm structure */ >+ dapli_cm_destroy(cm_ptr); >+ return DAT_INTERNAL_ERROR; >+} >+ >+ >+/* >+ * ACTIVE: exchange QP information, called from CR thread >+ */ >+void >+dapli_socket_connect_rtu(dp_ib_cm_handle_t cm_ptr) >+{ >+ DAPL_EP *ep_ptr = cm_ptr->ep; >+ DAPL_IA *ia_ptr = cm_ptr->ep->header.owner_ia; >+ int len, rc; >+ DWORD ioflags; >+ WSABUF iovec[1]; >+ short rtu_data = htons(0x0E0F); >+ ib_cm_events_t event = IB_CME_DESTINATION_REJECT; >+ ib_api_status_t ibs; >+ dapl_ibal_port_t *p_port; >+ dapl_ibal_ca_t *p_ca; >+ > /* read DST information into cm_ptr, overwrite SRC info */ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: recv peer >QP data\n"); >+ >+ iovec[0].buf = (char*)&cm_ptr->dst; >+ iovec[0].len = sizeof(ib_qp_cm_t); > ioflags = len = 0; > rc = WSARecv (cm_ptr->socket, iovec, 1, &len, &ioflags, 0, 0); >- if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) ) { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR,"connect read: >ERR %d rcnt=%d\n", >- WSAGetLastError(), len); >+ if ( rc == SOCKET_ERROR || len != sizeof(ib_qp_cm_t) || >+ ntohs(cm_ptr->dst.ver) >!= DSCM_VER ) >+ { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "connect_rtu read: ERR %d rcnt=%d >ver=%d\n", >+ WSAGetLastError(), len, cm_ptr->dst.ver); >+ goto bail; >+ } >+ >+ /* check for consumer reject */ >+ if (cm_ptr->dst.rej) { >+ dapl_dbg_log(DAPL_DBG_TYPE_CM, >+ " connect_rtu read: PEER REJ >reason=0x%x\n", >+ ntohs(cm_ptr->dst.rej)); >+ event = IB_CME_DESTINATION_REJECT_PRIVATE_DATA; > goto bail; > } > >- /* revert back to host byte ordering */ >+ /* convert peer response values to host order */ > cm_ptr->dst.port = cl_ntoh16(cm_ptr->dst.port); >+ cm_ptr->dst.lid = ntohs(cm_ptr->dst.lid); >+ cm_ptr->dst.qpn = cm_ptr->dst.qpn; > cm_ptr->dst.p_size = cl_ntoh32(cm_ptr->dst.p_size); > > dapl_dbg_log(DAPL_DBG_TYPE_EP, " connect: Rx DST: qpn >%x port %d " >@@ -245,15 +641,27 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > cm_ptr->dst.p_size, > >dapli_get_ip_addr_str(&cm_ptr->dst.ia_address,NULL)); > >+ /* save remote address information */ >+ dapl_os_memcpy( &ep_ptr->remote_ia_address, >+ &cm_ptr->dst.ia_address, >+ sizeof(ep_ptr->remote_ia_address)); >+ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, >+ " connect_rtu: DST %s port=0x%x lid=0x%x, >qpn=0x%x, psize=%d\n", >+ inet_ntoa(((struct sockaddr_in >*)&cm_ptr->dst.ia_address)->sin_addr), >+ cm_ptr->dst.port, cm_ptr->dst.lid, >+ cm_ptr->dst.qpn, cm_ptr->dst.p_size); >+ > /* validate private data size before reading */ >- if ( cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE ) { >+ if (cm_ptr->dst.p_size > IB_MAX_REP_PDATA_SIZE) { > dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- " connect read: psize (%d) wrong\n", >+ " connect_rtu read: psize (%d) wrong\n", > cm_ptr->dst.p_size ); > goto bail; > } > > /* read private data into cm_handle if any present */ >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket connected, read >private data\n"); > if ( cm_ptr->dst.p_size ) { > iovec[0].buf = cm_ptr->p_data; > iovec[0].len = cm_ptr->dst.p_size; >@@ -300,32 +708,29 @@ dapli_socket_connect ( DAPL_EP > *ep_ptr, > > ep_ptr->qp_state = IB_QP_STATE_RTS; > >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," connect_rtu: send RTU\n"); >+ > /* complete handshake after final QP state change */ > send(cm_ptr->socket, (const char *)&rtu_data, >sizeof(rtu_data), 0); > > /* init cm_handle and post the event with private data */ > ep_ptr->cm_handle = cm_ptr; >+ cm_ptr->state = SCM_CONNECTED; > dapl_dbg_log( DAPL_DBG_TYPE_EP," ACTIVE: connected!\n" ); > > dapl_evd_connection_callback( ep_ptr->cm_handle, > IB_CME_CONNECTED, > cm_ptr->p_data, > ep_ptr ); >- return DAT_SUCCESS; >- >+ return; > bail: > /* close socket, free cm structure and post error event */ >- if ( cm_ptr->socket >= 0 ) >- closesocket(cm_ptr->socket); >- >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); >- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */ >- >- dapl_evd_connection_callback( ep_ptr->cm_handle, >- IB_CME_LOCAL_FAILURE, >+ dapli_cm_destroy(cm_ptr); >+ dapls_ib_reinit_ep(ep_ptr); /* reset QP state */ >+ dapl_evd_connection_callback( NULL /*ep_ptr->cm_handle*/, >+ event, > NULL, > ep_ptr ); >- return DAT_INTERNAL_ERROR; > } > > >@@ -347,14 +752,12 @@ dapli_socket_listen ( DAPL_IA > *ia_ptr, > ia_ptr, serviceID, sp_ptr); > > /* Allocate CM and initialize */ >- if ((cm_ptr = dapl_os_alloc(sizeof(*cm_ptr))) == NULL) >+ cm_ptr = dapli_cm_create(); >+ if (cm_ptr == NULL) > return DAT_INSUFFICIENT_RESOURCES; > >- (void) dapl_os_memzero( cm_ptr, sizeof( *cm_ptr ) ); >- >- cm_ptr->socket = cm_ptr->l_socket = -1; > cm_ptr->sp = sp_ptr; >- cm_ptr->hca_ptr = ia_ptr->hca_ptr; >+ cm_ptr->hca = ia_ptr->hca_ptr; > > /* bind, listen, set sockopt, accept, exchange data */ > if ((cm_ptr->l_socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { >@@ -406,12 +809,9 @@ dapli_socket_listen ( DAPL_IA > *ia_ptr, > /* set cm_handle for this service point, save listen socket */ > sp_ptr->cm_srvc_handle = cm_ptr; > >- /* add to SP->CR thread list */ >- dapl_llist_init_entry((DAPL_LLIST_ENTRY*)&cm_ptr->entry); >- dapl_os_lock( &cm_ptr->hca_ptr->ib_trans.lock ); >- >dapl_llist_add_tail((DAPL_LLIST_HEAD*)&cm_ptr->hca_ptr->ib_trans.list, >- (DAPL_LLIST_ENTRY*)&cm_ptr->entry, >(void*)cm_ptr); >- dapl_os_unlock(&cm_ptr->hca_ptr->ib_trans.lock); >+ /* queue up listen socket to process inbound CR's */ >+ cm_ptr->state = SCM_LISTEN; >+ dapli_cm_queue(cm_ptr); > > dapl_dbg_log( DAPL_DBG_TYPE_CM, > " listen: qual 0x%x cr %p s_fd %d\n", >@@ -421,10 +821,7 @@ dapli_socket_listen ( DAPL_IA > *ia_ptr, > bail: > dapl_dbg_log( DAPL_DBG_TYPE_CM, > " listen: ERROR on conn_qual >0x%x\n",serviceID); >- if ( cm_ptr->l_socket >= 0 ) >- closesocket( cm_ptr->l_socket ); >- >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); >+ dapli_cm_destroy(cm_ptr); > return dat_status; > } > >@@ -441,6 +838,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > int len; > DAT_RETURN dat_status = DAT_SUCCESS; > >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket_accept\n"); >+ > /* Allocate accept CM and initialize */ > if ((acm_ptr = dapl_os_alloc(sizeof(*acm_ptr))) == NULL) > return DAT_INSUFFICIENT_RESOURCES; >@@ -448,8 +847,9 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > (void) dapl_os_memzero( acm_ptr, sizeof( *acm_ptr ) ); > > acm_ptr->socket = -1; >+ acm_ptr->l_socket = -1; > acm_ptr->sp = cm_ptr->sp; >- acm_ptr->hca_ptr = cm_ptr->hca_ptr; >+ acm_ptr->hca = cm_ptr->hca; > > len = sizeof(acm_ptr->dst.ia_address); > acm_ptr->socket = accept(cm_ptr->l_socket, >@@ -464,27 +864,32 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > goto bail; > } > >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read >QP data\n"); >+ > /* read in DST QP info, IA address. check for private data */ > len = >recv(acm_ptr->socket,(char*)&acm_ptr->dst,sizeof(ib_qp_cm_t),0); >- if ( len != sizeof(ib_qp_cm_t) ) { >+ if ( len != sizeof(ib_qp_cm_t) || >ntohs(acm_ptr->dst.ver) != DSCM_VER ) >+ { > dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- " accept read: ERR %d, rcnt=%d\n", >- WSAGetLastError(), len); >+ " accept read: ERR %d, rcnt=%d ver=%d\n", >+ WSAGetLastError(), len, acm_ptr->dst.ver); > dat_status = DAT_INTERNAL_ERROR; > goto bail; > > } >- /* revert back to host byte ordering */ >+ /* convert accepted values to host byte ordering */ > acm_ptr->dst.port = cl_ntoh16(acm_ptr->dst.port); >+ acm_ptr->dst.lid = ntohs(acm_ptr->dst.lid); >+ acm_ptr->dst.qpn = acm_ptr->dst.qpn; > acm_ptr->dst.p_size = cl_ntoh32(acm_ptr->dst.p_size); > >- dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST >sizeof(ib_cm_t) %d qpn %x " >- "port %d lid 0x%x psize %d IP %s\n", >- sizeof(ib_qp_cm_t), >- cl_ntoh32(acm_ptr->dst.qpn), acm_ptr->dst.port, >+ dapl_dbg_log(DAPL_DBG_TYPE_EP, " accept: DST %s port 0x%x " >+ "lid 0x%x qpn 0x%x psize %d\n", >+ dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL), >+ acm_ptr->dst.port, > cl_ntoh16(acm_ptr->dst.lid), >- acm_ptr->dst.p_size, >- dapli_get_ip_addr_str(&acm_ptr->dst.ia_address,NULL)); >+ cl_ntoh32(acm_ptr->dst.qpn), >+ acm_ptr->dst.p_size); > > /* validate private data size before reading */ > if ( acm_ptr->dst.p_size > IB_MAX_REQ_PDATA_SIZE ) { >@@ -495,6 +900,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > goto bail; > } > >+ dapl_dbg_log(DAPL_DBG_TYPE_EP," socket accepted, read >private data\n"); >+ > /* read private data into cm_handle if any present */ > if ( acm_ptr->dst.p_size ) { > len = recv( acm_ptr->socket, >@@ -514,6 +921,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > p_data = acm_ptr->p_data; > } > >+ acm_ptr->state = SCM_ACCEPTING; >+ > /* trigger CR event and return SUCCESS */ > dapls_cr_callback( acm_ptr, > IB_CME_CONNECTION_REQUEST_PENDING, >@@ -521,153 +930,8 @@ dapli_socket_accept(ib_cm_srvc_handle_t cm_ptr) > acm_ptr->sp ); > > return DAT_SUCCESS; >- >-bail: >- if ( acm_ptr->socket >= 0 ) >- closesocket( acm_ptr->socket ); >- >- dapl_os_free( acm_ptr, sizeof( *acm_ptr ) ); >- return DAT_INTERNAL_ERROR; >-} >- >- >-static DAT_RETURN >-dapli_socket_accept_final( DAPL_EP *ep_ptr, >- DAPL_CR *cr_ptr, >- DAT_COUNT p_size, >- DAT_PVOID p_data ) >-{ >- DAPL_IA *ia_ptr = ep_ptr->header.owner_ia; >- dp_ib_cm_handle_t cm_ptr = cr_ptr->ib_cm_handle; >- ib_qp_cm_t qp_cm; >- WSABUF iovec[2]; >- int len, rc; >- short rtu_data = 0; >- ib_api_status_t ibs; >- ib_qp_attr_t qpa; >- dapl_ibal_port_t *p_port; >- dapl_ibal_ca_t *p_ca; >- >- dapl_dbg_log (DAPL_DBG_TYPE_EP, "%s() p_sz %d sock %d >port %d\n", >- __FUNCTION__,p_size,cm_ptr->socket, >- ia_ptr->hca_ptr->port_num); >- >- if (p_size > IB_MAX_REP_PDATA_SIZE) >- return DAT_LENGTH_ERROR; >- >- /* must have a accepted socket */ >- if ( cm_ptr->socket < 0 ) >- return DAT_INTERNAL_ERROR; >- >- /* modify QP to RTR and then to RTS with remote info >already read */ >- >- p_ca = (dapl_ibal_ca_t *) ia_ptr->hca_ptr->ib_hca_handle; >- p_port = dapli_ibal_get_port (p_ca, >(uint8_t)ia_ptr->hca_ptr->port_num); >- if (!p_port) >- { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- "%s() dapli_ibal_get_port() failed @ >line #%d\n", >- __FUNCTION__,__LINE__); >- goto bail; >- } >- >- dapl_dbg_log(DAPL_DBG_TYPE_EP, "%s() DST: qpn %x port >%d lid %x\n", >- __FUNCTION__, >- cl_ntoh32(cm_ptr->dst.qpn), >- cm_ptr->dst.port, >- cl_ntoh16(cm_ptr->dst.lid)); >- >- /* modify QP to RTR and then to RTS with remote info */ >- >- ibs = dapls_modify_qp_state_to_rtr( ep_ptr->qp_handle, >- cm_ptr->dst.qpn, >- cm_ptr->dst.lid, >- p_port ); >- if (ibs != IB_SUCCESS) >- { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- "%s() QP --> RTR failed @ line #%d\n", >- __FUNCTION__,__LINE__); >- goto bail; >- } >- >- if ( dapls_modify_qp_state_to_rts( ep_ptr->qp_handle ) ) >- { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- "%s() QP --> RTS failed @ line #%d\n", >- __FUNCTION__,__LINE__); >- goto bail; >- } >- >- ep_ptr->qp_state = IB_QP_STATE_RTS; >- >- /* determine QP & port numbers */ >- ibs = ib_query_qp(ep_ptr->qp_handle, &qpa); >- if (ibs != IB_SUCCESS) >- { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- " ib_query_qp() ERR %s\n", >ib_get_err_str(ibs)); >- goto bail; >- } >- >- /* Send QP info, IA address, and private data */ >- qp_cm.qpn = qpa.num; /* ib_net32_t */ >- qp_cm.port = ia_ptr->hca_ptr->port_num; >- qp_cm.lid = dapli_get_lid( ia_ptr->hca_ptr, >ia_ptr->hca_ptr->port_num ); >- qp_cm.ia_address = ia_ptr->hca_ptr->hca_address; >- qp_cm.p_size = p_size; >- >- dapl_dbg_log(DAPL_DBG_TYPE_CM, >- "%s()\n Tx QP info: qpn %x port %d lid 0x%x >p_sz %d IP %s\n", >- __FUNCTION__, cl_ntoh32(qp_cm.qpn), qp_cm.port, >- cl_ntoh16(qp_cm.lid), qp_cm.p_size, >- dapli_get_ip_addr_str(&qp_cm.ia_address,NULL)); >- >- /* network byte-ordering - QPN & LID already are */ >- qp_cm.p_size = cl_hton32(qp_cm.p_size); >- qp_cm.port = cl_hton16(qp_cm.port); >- >- iovec[0].buf = (char*)&qp_cm; >- iovec[0].len = sizeof(ib_qp_cm_t); >- if (p_size) { >- iovec[1].buf = p_data; >- iovec[1].len = p_size; >- } >- rc = WSASend( cm_ptr->socket, iovec, (p_size ? 2:1), >&len, 0, 0, 0 ); >- if (rc || len != (p_size + sizeof(ib_qp_cm_t))) { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- " accept_final: ERR %d, wcnt=%d\n", >- WSAGetLastError(), len); >- goto bail; >- } >- dapl_dbg_log(DAPL_DBG_TYPE_EP, >- " accept_final: SRC qpn %x port %d lid >0x%x psize %d\n", >- qp_cm.qpn, qp_cm.port, qp_cm.lid, qp_cm.p_size ); >- >- /* complete handshake after final QP state change */ >- len = recv(cm_ptr->socket, (char*)&rtu_data, >sizeof(rtu_data), 0); >- if ( len != sizeof(rtu_data) || ntohs(rtu_data) != 0x0e0f ) { >- dapl_dbg_log(DAPL_DBG_TYPE_ERR, >- " accept_final: ERR %d, rcnt=%d >rdata=%x\n", >- WSAGetLastError(), len, ntohs(rtu_data) ); >- goto bail; >- } >- >- /* final data exchange if remote QP state is good to go */ >- dapl_dbg_log( DAPL_DBG_TYPE_EP," PASSIVE: connected!\n" ); >- >- dapls_cr_callback ( cm_ptr, IB_CME_CONNECTED, NULL, >cm_ptr->sp ); >- >- return DAT_SUCCESS; >- > bail: >- dapl_dbg_log( DAPL_DBG_TYPE_ERR," accept_final: ERR >!QP_RTR_RTS \n"); >- if ( cm_ptr->socket >= 0 ) >- closesocket( cm_ptr->socket ); >- >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); >- dapls_ib_reinit_ep( ep_ptr ); /* reset QP state */ >- >+ dapli_cm_destroy(acm_ptr); > return DAT_INTERNAL_ERROR; > } > >@@ -747,11 +1011,7 @@ dapls_ib_disconnect ( > dapl_dbg_log (DAPL_DBG_TYPE_EP, > "dapls_ib_disconnect(ep_handle %p >....)\n", ep_ptr); > >- if ( cm_ptr->socket >= 0 ) { >- closesocket( cm_ptr->socket ); >- cm_ptr->socket = -1; >- } >- >+#if 0 // XXX > /* disconnect QP ala transition to RESET state */ > ib_status = dapls_modify_qp_state_to_reset (ep_ptr->qp_handle); > >@@ -776,15 +1036,18 @@ dapls_ib_disconnect ( > NULL, > ep_ptr ); > ep_ptr->cm_handle = NULL; >- dapl_os_free( cm_ptr, sizeof( *cm_ptr ) ); > } >- >+#endif > /* modify QP state --> INIT */ > dapls_ib_reinit_ep(ep_ptr); > >+ if (cm_ptr == NULL) > return DAT_SUCCESS; >+ else >+ return dapli_socket_disconnect(cm_ptr); > } > >+ > /* > * dapls_ib_disconnect_clean > * >@@ -874,14 +1137,20 @@ dapls_ib_remove_conn_listener ( > if ( cm_ptr != NULL ) { > if ( cm_ptr->l_socket >= 0 ) { > closesocket( cm_ptr->l_socket ); >+ cm_ptr->l_socket = -1; >+ } >+ if ( cm_ptr->socket >= 0 ) { >+ closesocket( cm_ptr->socket ); > cm_ptr->socket = -1; > } > /* cr_thread will free */ > sp_ptr->cm_srvc_handle = NULL; >+ _write(g_scm_pipe[1], "w", sizeof "w"); > } > return DAT_SUCCESS; > } > >+ > /* > * dapls_ib_accept_connection > * >@@ -928,7 +1197,7 @@ dapls_ib_accept_connection ( > return status; > } > >- return ( dapli_socket_accept_final(ep_ptr, cr_ptr, >p_size, p_data) ); >+ return ( dapli_socket_accept_usr(ep_ptr, cr_ptr, >p_size, p_data) ); > } > > >@@ -948,27 +1217,39 @@ dapls_ib_accept_connection ( > * DAT_INTERNAL_ERROR > * > */ >+ > DAT_RETURN > dapls_ib_reject_connection ( >- IN dp_ib_cm_handle_t ib_cm_handle, >+ IN dp_ib_cm_handle_t cm_ptr, > IN int reject_reason, >- IN DAT_COUNT private_data_size, >- IN const DAT_PVOID private_data) >+ IN DAT_COUNT psize, >+ IN const DAT_PVOID pdata) > { >- ib_cm_srvc_handle_t cm_ptr = ib_cm_handle; >+ WSABUF iovec[1]; >+ int len; > > dapl_dbg_log (DAPL_DBG_TYPE_EP, >- "dapls_ib_reject_connection(cm_handle %p >reason %x)\n", >- ib_cm_handle, reject_reason ); >- >- /* just close the socket and return */ >- if ( cm_ptr->socket > 0 ) { >- closesocket( cm_ptr->socket ); >+ " reject(cm %p reason %x pdata %p psize %d)\n", >+ cm_ptr, reject_reason, pdata, psize ); >+ >+ /* write reject data to indicate reject */ >+ if (cm_ptr->socket >= 0) { >+ cm_ptr->dst.rej = (uint16_t)reject_reason; >+ cm_ptr->dst.rej = cl_hton16(cm_ptr->dst.rej); >+ iovec[0].buf = (char*)&cm_ptr->dst; >+ iovec[0].len = sizeof(ib_qp_cm_t); >+ (void) WSASend (cm_ptr->socket, iovec, 1, >&len, 0, 0, NULL); >+ closesocket(cm_ptr->socket); > cm_ptr->socket = -1; > } >+ >+ /* cr_thread will destroy CR */ >+ cm_ptr->state = SCM_REJECTED; >+ _write(g_scm_pipe[1], "w", sizeof "w"); > return DAT_SUCCESS; > } > >+ > /* > * dapls_ib_cm_remote_addr > * >@@ -1157,7 +1438,7 @@ dapls_ib_get_dat_event ( > > > /* >- * dapls_ib_get_dat_event >+ * dapls_ib_get_cm_event > * > * Return a DAT connection event given a provider CM event. > * >@@ -1189,12 +1470,16 @@ dapls_ib_get_cm_event ( > } > #endif /* NOT_USED */ > >-/* async CR processing thread to avoid blocking applications */ >+/* outbound/inbound CR processing thread to avoid blocking >applications */ >+ >+#define SCM_MAX_CONN (8 * sizeof(fd_set)) >+ > void cr_thread(void *arg) > { > struct dapl_hca *hca_ptr = arg; > ib_cm_srvc_handle_t cr, next_cr; > int max_fd, rc; >+ char rbuf[2]; > fd_set rfd, rfds; > struct timeval to; > >@@ -1202,10 +1487,12 @@ void cr_thread(void *arg) > > dapl_os_lock( &hca_ptr->ib_trans.lock ); > hca_ptr->ib_trans.cr_state = IB_THREAD_RUN; >+ > while (hca_ptr->ib_trans.cr_state == IB_THREAD_RUN) { > > FD_ZERO( &rfds ); >- max_fd = -1; >+ FD_SET(g_scm_pipe[0], &rfds); >+ max_fd = g_scm_pipe[0]; > > if >(!dapl_llist_is_empty((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list)) > next_cr = dapl_llist_peek_head((DAPL_LLIST_HEAD*) >@@ -1230,32 +1517,46 @@ void cr_thread(void *arg) > continue; > } > >- FD_SET( cr->l_socket, &rfds ); /* add to select set */ >- if ( cr->l_socket > max_fd ) >+ if (cr->socket > SCM_MAX_CONN-1) { >+ dapl_dbg_log(DAPL_DBG_TYPE_ERR, >+ "SCM ERR: cr->socket(%d) exceeded >FD_SETSIZE %d\n", >+ cr->socket,SCM_MAX_CONN-1); >+ continue; >+ } >+ FD_SET( cr->socket, &rfds ); /* add to select SET */ >+ if ( cr->socket > max_fd ) > max_fd = cr->l_socket; > > /* individual select poll to check for work */ > FD_ZERO(&rfd); >- FD_SET(cr->l_socket, &rfd); >+ FD_SET(cr->socket, &rfd); > dapl_os_unlock(&hca_ptr->ib_trans.lock); > > to.tv_sec = 0; > to.tv_usec = 0; /* wakeup and check destroy */ > > /* block waiting for Rx data */ >- if (select(cr->l_socket+1,&rfd,NULL,NULL,&to) == >SOCKET_ERROR) { >+ if (select(cr->socket+1,&rfd,NULL,NULL,&to) == >SOCKET_ERROR) { > rc = WSAGetLastError(); > if ( rc != SOCKET_ERROR /*WSAENOTSOCK*/ ) > { > dapl_dbg_log (DAPL_DBG_TYPE_ERR/*CM*/, > " thread: select(sock %d) ERR >%d on cr %p\n", >- cr->l_socket, rc, cr); >+ cr->socket, rc, cr); >+ } >+ closesocket(cr->socket); >+ cr->socket = -1; >+ } else if (FD_ISSET(cr->socket,&rfd)) { >+ if (cr->socket > 0) { >+ if (cr->state == SCM_LISTEN) >+ dapli_socket_accept(cr); >+ else if (cr->state == SCM_ACCEPTED) >+ dapli_socket_accept_rtu(cr); >+ else if (cr->state == SCM_CONN_PENDING) >+ dapli_socket_connect_rtu(cr); >+ else if (cr->state == SCM_CONNECTED) >+ dapli_socket_disconnect(cr); > } >- closesocket(cr->l_socket); >- cr->l_socket = -1; >- } else if (FD_ISSET(cr->l_socket,&rfd) && >dapli_socket_accept(cr)) { >- closesocket(cr->l_socket); >- cr->l_socket = -1; > } > dapl_os_lock( &hca_ptr->ib_trans.lock ); > next_cr = dapl_llist_next_entry((DAPL_LLIST_HEAD*) >@@ -1263,9 +1564,19 @@ void cr_thread(void *arg) > >(DAPL_LLIST_ENTRY*)&cr->entry ); > } > dapl_os_unlock( &hca_ptr->ib_trans.lock ); >+ > to.tv_sec = 0; > to.tv_usec = 100000; /* wakeup and check destroy */ >+ > (void) select(max_fd+1, &rfds, NULL, NULL, &to); >+ >+ /* if pipe data consume - used to wake this thread up */ >+ if (FD_ISSET(g_scm_pipe[0],&rfds)) { >+ dapl_dbg_log(DAPL_DBG_TYPE_CM," cr_thread() >read pipe data\n"); >+printf(" cr_thread() read pipe data\n"); >+ _read(g_scm_pipe[0], rbuf, 2); >+printf(" cr_thread() Finished read pipe data\n"); >+ } > dapl_os_lock( &hca_ptr->ib_trans.lock ); > } > dapl_os_unlock( &hca_ptr->ib_trans.lock ); >diff --git a/dapl/ibal-scm/dapl_ibal-scm_util.c >b/dapl/ibal-scm/dapl_ibal-scm_util.c >index 8e5f8ac..06bc704 100644 >--- a/dapl/ibal-scm/dapl_ibal-scm_util.c >+++ b/dapl/ibal-scm/dapl_ibal-scm_util.c >@@ -52,6 +52,7 @@ static const char rcsid[] = "$Id: $"; > #include "dapl.h" > #include "dapl_adapter_util.h" > #include "dapl_ibal_util.h" >+#include "dapl_ibal_name_service.h" > > #include <stdio.h> > #include <stdlib.h> >@@ -61,9 +62,12 @@ static const char rcsid[] = "$Id: $"; > #include <winsock2.h> > #include <ws2tcpip.h> > #include <io.h> >+#include <fcntl.h> > >+extern void cr_thread(void *arg); > > int g_dapl_loopback_connection = 0; >+int g_scm_pipe[2]; > > #ifdef NOT_USED > >@@ -132,22 +136,55 @@ DAT_RETURN dapli_init_sock_cm ( IN >DAPL_HCA *hca_ptr ) > > dapl_dbg_log (DAPL_DBG_TYPE_UTIL, " %s(): >%p\n",__FUNCTION__,hca_ptr ); > >- /* set inline max with enviroment or default */ >+ /* set RC tunables via enviroment or default */ > hca_ptr->ib_trans.max_inline_send = >- dapl_os_get_env_val ( "DAPL_MAX_INLINE", >INLINE_SEND_DEFAULT ); >+ dapl_os_get_env_val("DAPL_MAX_INLINE", >INLINE_SEND_DEFAULT); >+#if 0 >+ hca_ptr->ib_trans.ack_retry = >+ dapl_os_get_env_val("DAPL_ACK_RETRY", SCM_ACK_RETRY); >+ hca_ptr->ib_trans.ack_timer = >+ dapl_os_get_env_val("DAPL_ACK_TIMER", SCM_ACK_TIMER); >+ hca_ptr->ib_trans.rnr_retry = >+ dapl_os_get_env_val("DAPL_RNR_RETRY", SCM_RNR_RETRY); >+ hca_ptr->ib_trans.rnr_timer = >+ dapl_os_get_env_val("DAPL_RNR_TIMER", SCM_RNR_TIMER); >+ hca_ptr->ib_trans.global = >+ dapl_os_get_env_val("DAPL_GLOBAL_ROUTING", SCM_GLOBAL); >+ hca_ptr->ib_trans.hop_limit = >+ dapl_os_get_env_val("DAPL_HOP_LIMIT", SCM_HOP_LIMIT); >+ hca_ptr->ib_trans.tclass = >+ dapl_os_get_env_val("DAPL_TCLASS", SCM_TCLASS); >+#endif > > /* initialize cr_list lock */ > dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.lock); > if (dat_status != DAT_SUCCESS) > { > dapl_dbg_log (DAPL_DBG_TYPE_ERR, >- " open_hca: failed to init lock\n"); >+ "%s() failed to init cr_list lock\n", >__FUNCTION__); > return DAT_INTERNAL_ERROR; > } > >+#if 0 >+ /* initialize cq_lock */ >+ dat_status = dapl_os_lock_init(&hca_ptr->ib_trans.cq_lock); >+ if (dat_status != DAT_SUCCESS) { >+ dapl_log(DAPL_DBG_TYPE_ERR, >+ "%s() failed to init cq_lock\n", >__FUNCTION__); >+ return DAT_INTERNAL_ERROR; >+ } >+#endif >+ > /* initialize CM list for listens on this HCA */ > >dapl_llist_init_head((DAPL_LLIST_HEAD*)&hca_ptr->ib_trans.list); > >+ /* create pipe communication endpoints */ >+ if (_pipe(g_scm_pipe, 256, O_TEXT)) { >+ dapl_dbg_log (DAPL_DBG_TYPE_ERR, >+ "%s() failed to create thread\n", >__FUNCTION__); >+ return DAT_INTERNAL_ERROR; >+ } >+ > /* create thread to process inbound connect request */ > hca_ptr->ib_trans.cr_state = IB_THREAD_INIT; > dat_status = dapl_os_thread_create(cr_thread, >@@ -199,6 +236,7 @@ DAT_RETURN dapli_close_sock_cm ( IN >DAPL_HCA *hca_ptr ) > > /* destroy cr_thread and lock */ > hca_ptr->ib_trans.cr_state = IB_THREAD_CANCEL; >+ > while (hca_ptr->ib_trans.cr_state != IB_THREAD_EXIT) { > dapl_dbg_log(DAPL_DBG_TYPE_UTIL, > " close_hca: waiting for cr_thread\n"); > > > >_______________________________________________ >ofw mailing list >[email protected] >http://lists.openfabrics.org/cgi-bin/mailman/listinfo/ofw > _______________________________________________ general mailing list [email protected] http://lists.openfabrics.org/cgi-bin/mailman/listinfo/general To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general
