> > Hi Jianfeng, > > > >> -----Original Message----- > >> From: Tan, Jianfeng > >> Sent: Tuesday, January 16, 2018 8:11 AM > >> To: Ananyev, Konstantin <konstantin.anan...@intel.com>; dev@dpdk.org; > >> Burakov, Anatoly <anatoly.bura...@intel.com> > >> Cc: Richardson, Bruce <bruce.richard...@intel.com>; tho...@monjalon.net > >> Subject: Re: [PATCH v2 3/4] eal: add synchronous multi-process > >> communication > >> > >> Thank you, Konstantin and Anatoly firstly. Other comments are well > >> received and I'll send out a new version. > >> > >> > >> On 1/16/2018 8:00 AM, Ananyev, Konstantin wrote: > >>>> We need the synchronous way for multi-process communication, > >>>> i.e., blockingly waiting for reply message when we send a request > >>>> to the peer process. > >>>> > >>>> We add two APIs rte_eal_mp_request() and rte_eal_mp_reply() for > >>>> such use case. By invoking rte_eal_mp_request(), a request message > >>>> is sent out, and then it waits there for a reply message. The > >>>> timeout is hard-coded 5 Sec. And the replied message will be copied > >>>> in the parameters of this API so that the caller can decide how > >>>> to translate those information (including params and fds). Note > >>>> if a primary process owns multiple secondary processes, this API > >>>> will fail. > >>>> > >>>> The API rte_eal_mp_reply() is always called by an mp action handler. > >>>> Here we add another parameter for rte_eal_mp_t so that the action > >>>> handler knows which peer address to reply. > >>>> > >>>> We use mutex in rte_eal_mp_request() to guarantee that only one > >>>> request is on the fly for one pair of processes. > >>> You don't need to do things in such strange and restrictive way. > >>> Instead you can do something like that: > >>> 1) Introduce new struct, list for it and mutex > >>> struct sync_request { > >>> int reply_received; > >>> char dst[PATH_MAX]; > >>> char reply[...]; > >>> LIST_ENTRY(sync_request) next; > >>> }; > >>> > >>> static struct > >>> LIST_HEAD(list, sync_request); > >>> pthread_mutex_t lock; > >>> pthead_cond_t cond; > >>> } sync_requests; > >>> > >>> 2) then at request() call: > >>> Grab sync_requests.lock > >>> Check do we already have a pending request for that destination, > >>> If yes - the release the lock and returns with error. > >>> - allocate and init new sync_request struct, set reply_received=0 > >>> - do send_msg() > >>> -then in a cycle: > >>> pthread_cond_timed_wait(&sync_requests.cond, &sync_request.lock, > >>> ×pec); > >>> - at return from it check if sync_request.reply_received == 1, if not > >>> check if timeout expired and either return a failure or go to the start > >>> of the cycle. > >>> > >>> 3) at mp_handler() if REPLY received - grab sync_request.lock, > >>> search through sync_requests.list for dst[] , > >>> if found, then set it's reply_received=1, copy the received message > >>> into reply > >>> and call pthread_cond_braodcast((&sync_requests.cond); > >> The only benefit I can see is that now the sender can request to > >> multiple receivers at the same time. And it makes things more > >> complicated. Do we really need this? > > The benefit is that one thread is blocked waiting for response, > > your mp_handler can still receive and handle other messages. > > This can already be done in the original implementation. mp_handler > listens for msg, request from the other peer(s), and replies the > requests, which is not affected. > > > Plus as you said - other threads can keep sending messages. > > For this one, in the original implementation, other threads can still > send msg, but not request. I suppose the request is not in a fast path, > why we care to make it fast? >
+int +rte_eal_mp_request(const char *action_name, + void *params, + int len_p, + int fds[], + int fds_in, + int fds_out) +{ + int i, j; + int sockfd; + int nprocs; + int ret = 0; + struct mp_msghdr *req; + struct timeval tv; + char buf[MAX_MSG_LENGTH]; + struct mp_msghdr *hdr; + + RTE_LOG(DEBUG, EAL, "request: %s\n", action_name); + + if (fds_in > SCM_MAX_FD || fds_out > SCM_MAX_FD) { + RTE_LOG(ERR, EAL, "Cannot send more than %d FDs\n", SCM_MAX_FD); + rte_errno = -E2BIG; + return 0; + } + + req = format_msg(action_name, params, len_p, fds_in, MP_REQ); + if (req == NULL) + return 0; + + if ((sockfd = open_unix_fd(0)) < 0) { + free(req); + return 0; + } + + tv.tv_sec = 5; /* 5 Secs Timeout */ + tv.tv_usec = 0; + if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, + (const void *)&tv, sizeof(struct timeval)) < 0) + RTE_LOG(INFO, EAL, "Failed to set recv timeout\n"); I f you set it just for one call, why do you not restore it? Also I don't think it is a good idea to change it here - if you'll make timeout a parameter value - then it could be overwritten by different threads. + + /* Only allow one req at a time */ + pthread_mutex_lock(&mp_mutex_request); + + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { + nprocs = 0; + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) + if (!mp_sec_sockets[i]) { + j = i; + nprocs++; + } + + if (nprocs > 1) { + RTE_LOG(ERR, EAL, + "multi secondary processes not supported\n"); + goto free_and_ret; + } + + ret = send_msg(sockfd, mp_sec_sockets[j], req, fds); As I remember - sndmsg() is also blocking call, so under some conditions you can stall there forever. As mp_mutex_requestis still held - next rte_eal_mp_request(0 will also block forever here. + } else + ret = send_msg(sockfd, eal_mp_unix_path(), req, fds); + + if (ret == 0) { + RTE_LOG(ERR, EAL, "failed to send request: %s\n", action_name); + ret = -1; + goto free_and_ret; + } + + ret = read_msg(sockfd, buf, MAX_MSG_LENGTH, fds, fds_out, NULL); if the message you receive is not a reply you are expecting - it will be simply dropped - mp_handler() would never process it. + if (ret > 0) { + hdr = (struct mp_msghdr *)buf; + if (hdr->len_params == len_p) + memcpy(params, hdr->params, len_p); + else { + RTE_LOG(ERR, EAL, "invalid reply\n"); + ret = 0; + } + } + +free_and_ret: + free(req); + close(sockfd); + pthread_mutex_unlock(&mp_mutex_request); + return ret; +} All of the above makes me think that current implementation is erroneous and needs to be reworked. Konstantin