Hi Jianfeng, > -----Original Message----- > From: Tan, Jianfeng > Sent: Thursday, November 30, 2017 6:44 PM > To: dev@dpdk.org > Cc: Burakov, Anatoly <anatoly.bura...@intel.com>; Richardson, Bruce > <bruce.richard...@intel.com>; Ananyev, Konstantin > <konstantin.anan...@intel.com>; tho...@monjalon.net; Tan, Jianfeng > <jianfeng....@intel.com> > Subject: [PATCH 1/3] eal: add channel for multi-process communication > > Previouly, there are three channels for multi-process > (i.e., primary/secondary) communication. > 1. Config-file based channel, in which, the primary process writes > info into a pre-defined config file, and the secondary process > reads info out. > 2. vfio submodule has its own channel based on unix socket for the > secondary process to get container fd and group fd from the > primary process. > 3. pdump submodule also has its own channel based on unix socket for > packet dump. > > It'll be good to have a generic communication channel for multi-process > communication to accomodate the requirements including: > a. Secondary wants to send info to primary, for example, secondary > would like to send request (about some specific vdev to primary). > b. Sending info at any time, instead of just initialization time. > c. Share FDs with the other side, for vdev like vhost, related FDs > (memory region, kick) should be shared. > d. A send message request needs the other side to response immediately. > > This patch proposes to create a communication channel, as an unix > socket connection, for above requirements. Primary will listen on > the unix socket; secondary will connect this socket to talk.
Kind of generic question - why do you need a connection-oriented socket here? Why just connection-less socket wouldn't be enough? In that case you don't need to do listen/accept, again you don't need epoll() loop. Instead with connection-less socket you can just use blocking recvmsg() to inside mp_handler(). > > Three new APIs are added: > > 1. rte_eal_mp_action_register is used to register an action, > indexed by a string; if the calling component wants to > response the messages from the corresponding component in > its primary process or secondary processes. > 2. rte_eal_mp_action_unregister is used to unregister the action > if the calling component does not want to response the messages. I think you need some sort of synchronization between action_register/unregister() and action_process() - mutex_lock or so. Another thing - as I understand you do use string as message*action identification? I think you need to limit max length of it. Konstantin > 3. rte_eal_mp_sendmsg is used to send a message. > > Signed-off-by: Jianfeng Tan <jianfeng....@intel.com> > --- > lib/librte_eal/common/eal_common_proc.c | 497 > ++++++++++++++++++++++++++++++++ > lib/librte_eal/common/eal_filesystem.h | 18 ++ > lib/librte_eal/common/eal_private.h | 10 + > lib/librte_eal/common/include/rte_eal.h | 68 +++++ > lib/librte_eal/linuxapp/eal/eal.c | 9 + > lib/librte_eal/rte_eal_version.map | 22 ++ > 6 files changed, 624 insertions(+) > > diff --git a/lib/librte_eal/common/eal_common_proc.c > b/lib/librte_eal/common/eal_common_proc.c > index 60526ca..5d0a095 100644 > --- a/lib/librte_eal/common/eal_common_proc.c > +++ b/lib/librte_eal/common/eal_common_proc.c > @@ -33,8 +33,21 @@ > #include <stdio.h> > #include <fcntl.h> > #include <stdlib.h> > +#include <sys/types.h> > +#include <sys/socket.h> > +#include <sys/epoll.h> > +#include <limits.h> > +#include <unistd.h> > +#include <sys/un.h> > +#include <errno.h> > +#include <pthread.h> > + > +#include <rte_log.h> > #include <rte_eal.h> > +#include <rte_lcore.h> > +#include <rte_common.h> > > +#include "eal_private.h" > #include "eal_filesystem.h" > #include "eal_internal_cfg.h" > > @@ -59,3 +72,487 @@ rte_eal_primary_proc_alive(const char *config_file_path) > > return !!ret; > } > + > +struct action_entry { > + TAILQ_ENTRY(action_entry) next; /**< Next attached action entry */ > + > +#define MAX_ACTION_NAME_LEN 64 > + char action_name[MAX_ACTION_NAME_LEN]; > + rte_eal_mp_t action; > +}; > + > +/** Double linked list of actions. */ > +TAILQ_HEAD(action_entry_list, action_entry); > + > +static struct action_entry_list action_entry_list = > + TAILQ_HEAD_INITIALIZER(action_entry_list); > + > +static struct action_entry * > +find_action_entry_by_name(const char *name) > +{ > + int len = strlen(name); > + struct action_entry *entry; > + > + TAILQ_FOREACH(entry, &action_entry_list, next) { > + if (strncmp(entry->action_name, name, len) == 0) > + break; > + } > + > + return entry; > +} > + > +int > +rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action) > +{ > + struct action_entry *entry = malloc(sizeof(struct action_entry)); > + > + if (entry == NULL) > + return -ENOMEM; > + > + if (find_action_entry_by_name(action_name) != NULL) > + return -EEXIST; > + > + strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN); > + entry->action = action; > + TAILQ_INSERT_TAIL(&action_entry_list, entry, next); > + return 0; > +} > + > +void > +rte_eal_mp_action_unregister(const char *name) > +{ > + struct action_entry *entry = find_action_entry_by_name(name); > + > + TAILQ_REMOVE(&action_entry_list, entry, next); > + free(entry); > +} > + > +/* The maximum amount of fd for one recvmsg/sendmsg */ > +#define SCM_MAX_FD 253 > +#define MAX_SECONDARY_PROCS 8 > +#define MAX_MESSAGE_LENGTH 1024 > + > +struct mp_fds { > + int efd; > + > + union { > + /* fds for primary process */ > + struct { > + int listen; > + /* fds used to send msg to secondary process(es) */ > + int secondaries[MAX_SECONDARY_PROCS]; > + }; > + > + /* fds for secondary process */ > + struct { > + /* fds used to send msg to the primary process */ > + int primary; > + }; > + }; > +}; > + > +static struct mp_fds mp_fds; > + > +struct msg_hdr { > + char action_name[MAX_ACTION_NAME_LEN]; > + int fds_num; > + int len_params; > + char params[0]; > +} __rte_packed; > + > +static int > +add_sec_proc(int fd) > +{ > + int i; > + > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) > + if (mp_fds.secondaries[i] == -1) > + break; > + > + if (i >= MAX_SECONDARY_PROCS) > + return -1; > + > + mp_fds.secondaries[i] = fd; > + > + return i; > +} > + > +static void > +del_sec_proc(int fd) > +{ > + int i; > + > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { > + if (mp_fds.secondaries[i] == fd) { > + mp_fds.secondaries[i] = -1; > + break; > + } > + } > +} > + > +static int > +read_msg(int sockfd, char *buf, int buflen, int *fds, int fds_num) > +{ > + struct iovec iov; > + struct msghdr msgh; > + size_t fdsize = fds_num * sizeof(int); > + char control[CMSG_SPACE(fdsize)]; > + struct cmsghdr *cmsg; > + struct msg_hdr *hdr = (struct msg_hdr *)buf; > + int ret, total; > + > + /* read msg_hdr */ > + memset(&msgh, 0, sizeof(msgh)); > + iov.iov_base = hdr; > + iov.iov_len = sizeof(*hdr); > + > + msgh.msg_iov = &iov; > + msgh.msg_iovlen = 1; > + msgh.msg_control = control; > + msgh.msg_controllen = sizeof(control); > + > + ret = recvmsg(sockfd, &msgh, 0); > + if (ret != sizeof(struct msg_hdr)) { > + RTE_LOG(ERR, EAL, "recvmsg failed\n"); > + return ret; > + } > + > + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { > + RTE_LOG(ERR, EAL, "truncted msg\n"); > + return -1; > + } > + total = ret; > + > + /* read auxiliary FDs if any */ > + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; > + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { > + if ((cmsg->cmsg_level == SOL_SOCKET) && > + (cmsg->cmsg_type == SCM_RIGHTS)) { > + memcpy(fds, CMSG_DATA(cmsg), fdsize); > + break; > + } > + } > + > + /* read params */ > + if (hdr->len_params) { > + if (hdr->len_params > buflen - (int)sizeof(*hdr)) > + rte_exit(EXIT_FAILURE, "params too long\n"); > + > + ret = read(sockfd, &hdr->params, hdr->len_params); > + if (ret != hdr->len_params) > + rte_exit(EXIT_FAILURE, "failed to recv params\n"); > + > + total += ret; > + } > + > + RTE_LOG(INFO, EAL, "read msg: %s, %d\n", hdr->action_name, > + (int)sizeof(*hdr) + hdr->len_params); > + return total; > +} > + > +static int > +process_msg(int fd) > +{ > + int len; > + int params_len; > + char buf[MAX_MESSAGE_LENGTH]; > + int fds[SCM_MAX_FD]; > + struct msg_hdr *hdr; > + struct action_entry *entry; > + > + len = read_msg(fd, buf, MAX_MESSAGE_LENGTH, fds, SCM_MAX_FD); > + if (len <= 0) { > + RTE_LOG(ERR, EAL, "failed to read message: %s\n", > + strerror(errno)); > + return -1; > + } > + > + hdr = (struct msg_hdr *) buf; > + > + entry = find_action_entry_by_name(hdr->action_name); > + if (entry == NULL) { > + RTE_LOG(ERR, EAL, "cannot find action by: %s\n", > + hdr->action_name); > + return -1; > + } > + > + params_len = len - sizeof(struct msg_hdr); > + > + return entry->action(hdr->params, params_len, fds, hdr->fds_num); > +} > + > +static int > +add_secondary(void) > +{ > + int fd; > + struct epoll_event ev; > + > + while (1) { > + fd = accept(mp_fds.listen, NULL, NULL); > + if (fd < 0 && errno == EAGAIN) > + break; > + else if (fd < 0) { > + RTE_LOG(ERR, EAL, "primary failed to accept: %s\n", > + strerror(errno)); > + return -1; > + } > + > + ev.events = EPOLLIN | EPOLLRDHUP; > + ev.data.fd = fd; > + if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, fd, &ev) < 0) { > + RTE_LOG(ERR, EAL, "failed to add secondary: %s\n", > + strerror(errno)); > + break; > + } > + if (add_sec_proc(fd) < 0) { > + RTE_LOG(ERR, EAL, "too many secondary processes\n"); > + close(fd); > + break; > + } > + } > + > + return 0; > +} > + > +static void * > +mp_handler(void *arg __rte_unused) > +{ > + int fd; > + int i, n; > + struct epoll_event ev; > + struct epoll_event *events; > + int is_primary = rte_eal_process_type() == RTE_PROC_PRIMARY; > + > + ev.events = EPOLLIN | EPOLLRDHUP; > + ev.data.fd = (is_primary) ? mp_fds.listen : mp_fds.primary; > + if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) { > + RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n", > + strerror(errno)); > + exit(EXIT_FAILURE); > + } > + > + events = calloc(20, sizeof ev); > + > + while (1) { > + n = epoll_wait(mp_fds.efd, events, 20, -1); > + for (i = 0; i < n; i++) { > + if (is_primary && events[i].data.fd == mp_fds.listen) { > + if (events[i].events != EPOLLIN) { > + RTE_LOG(ERR, EAL, "what happens?\n"); > + exit(EXIT_FAILURE); > + } > + > + if (add_secondary() < 0) > + break; > + > + continue; > + } > + > + fd = events[i].data.fd; > + > + if ((events[i].events & EPOLLIN)) { > + if (process_msg(fd) < 0) { > + RTE_LOG(ERR, EAL, > + "failed to process msg\n"); > + if (!is_primary) > + exit(EXIT_FAILURE); > + } > + continue; > + } > + > + /* EPOLLERR, EPOLLHUP, etc */ > + if (is_primary) { > + RTE_LOG(ERR, EAL, "secondary exit: %d\n", fd); > + epoll_ctl(mp_fds.efd, EPOLL_CTL_DEL, fd, NULL); > + del_sec_proc(fd); > + close(fd); > + } else { > + RTE_LOG(ERR, EAL, "primary exits, so do I\n"); > + /* Exit secondary when primary exits? */ > + exit(EXIT_FAILURE); > + } > + } > + } > + > + return NULL; > +} > + > +int > +rte_eal_mp_channel_init(void) > +{ > + int i, fd, ret; > + const char *path; > + struct sockaddr_un un; > + pthread_t tid; > + char thread_name[RTE_MAX_THREAD_NAME_LEN]; > + > + mp_fds.efd = epoll_create1(0); > + if (mp_fds.efd < 0) { > + RTE_LOG(ERR, EAL, "epoll_create1 failed\n"); > + return -1; > + } > + > + fd = socket(AF_UNIX, SOCK_STREAM, 0); > + if (fd < 0) { > + RTE_LOG(ERR, EAL, "Failed to create unix socket\n"); > + return -1; > + } > + > + memset(&un, 0, sizeof(un)); > + un.sun_family = AF_UNIX; > + path = eal_mp_unix_path(); > + strncpy(un.sun_path, path, sizeof(un.sun_path)); > + un.sun_path[sizeof(un.sun_path) - 1] = '\0'; > + > + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) > + mp_fds.secondaries[i] = -1; > + > + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) { > + RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n"); > + close(fd); > + return -1; > + } > + > + /* The file still exists since last run */ > + unlink(path); > + > + ret = bind(fd, (struct sockaddr *)&un, sizeof(un)); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n", > + path, strerror(errno)); > + close(fd); > + return -1; > + } > + RTE_LOG(INFO, EAL, "primary bind to %s\n", path); > + > + ret = listen(fd, 1024); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to listen: %s\n", > + strerror(errno)); > + close(fd); > + return -1; > + } > + mp_fds.listen = fd; > + } else { > + ret = connect(fd, (struct sockaddr *)&un, sizeof(un)); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to connect primary\n"); > + return -1; > + } > + mp_fds.primary = fd; > + } > + > + ret = pthread_create(&tid, NULL, mp_handler, NULL); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to create thead: %s\n", > + strerror(errno)); > + close(fd); > + close(mp_fds.efd); > + return -1; > + } > + > + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, > + "rte_mp_handle"); > + ret = rte_thread_setname(tid, thread_name); > + if (ret < 0) { > + RTE_LOG(ERR, EAL, "failed to set thead name\n"); > + close(fd); > + close(mp_fds.efd); > + return -1; > + } > + > + return 0; > +} > + > +static int > +send_msg(int fd, struct msghdr *p_msgh) > +{ > + int ret; > + > + do { > + ret = sendmsg(fd, p_msgh, 0); > + } while (ret < 0 && errno == EINTR); > + > + if (ret < 0) > + RTE_LOG(ERR, EAL, "failed to send msg: %s\n", strerror(errno)); > + > + return ret; > +} > + > +int > +rte_eal_mp_sendmsg(const char *action_name, > + const void *params, > + int len_params, > + int fds[], > + int fds_num) > +{ > + int i; > + int ret = 0; > + struct msghdr msgh; > + struct iovec iov; > + size_t fd_size = fds_num * sizeof(int); > + char control[CMSG_SPACE(fd_size)]; > + struct cmsghdr *cmsg; > + struct msg_hdr *msg; > + int len_msg; > + > + if (fds_num > SCM_MAX_FD) { > + RTE_LOG(ERR, EAL, > + "Cannot send more than %d FDs\n", SCM_MAX_FD); > + return -E2BIG; > + } > + > + len_msg = sizeof(struct msg_hdr) + len_params; > + if (len_msg > MAX_MESSAGE_LENGTH) { > + RTE_LOG(ERR, EAL, "Message is too long\n"); > + return -ENOMEM; > + } > + > + RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg); > + > + msg = malloc(len_msg); > + if (!msg) { > + RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n"); > + return -ENOMEM; > + } > + memset(msg, 0, len_msg); > + strcpy(msg->action_name, action_name); > + msg->fds_num = fds_num; > + msg->len_params = len_params; > + memcpy(msg->params, params, len_params); > + > + memset(&msgh, 0, sizeof(msgh)); > + memset(control, 0, sizeof(control)); > + > + iov.iov_base = (uint8_t *)msg; > + iov.iov_len = len_msg; > + > + msgh.msg_iov = &iov; > + msgh.msg_iovlen = 1; > + msgh.msg_control = control; > + msgh.msg_controllen = sizeof(control); > + > + cmsg = CMSG_FIRSTHDR(&msgh); > + cmsg->cmsg_len = CMSG_LEN(fd_size); > + cmsg->cmsg_level = SOL_SOCKET; > + cmsg->cmsg_type = SCM_RIGHTS; > + memcpy(CMSG_DATA(cmsg), fds, fd_size); > + > + if (rte_eal_process_type() == RTE_PROC_PRIMARY) { > + for (i = 0; i < MAX_SECONDARY_PROCS; ++i) { > + if (mp_fds.secondaries[i] == -1) > + continue; > + > + ret = send_msg(mp_fds.secondaries[i], &msgh); > + if (ret < 0) > + break; > + } > + } else { > + ret = send_msg(mp_fds.primary, &msgh); > + } > + > + free(msg); > + > + return ret; > +} > diff --git a/lib/librte_eal/common/eal_filesystem.h > b/lib/librte_eal/common/eal_filesystem.h > index 8acbd99..3d9514f 100644 > --- a/lib/librte_eal/common/eal_filesystem.h > +++ b/lib/librte_eal/common/eal_filesystem.h > @@ -67,6 +67,24 @@ eal_runtime_config_path(void) > return buffer; > } > > +/** Path of primary/secondary communication unix socket file. */ > +#define MP_UNIX_PATH_FMT "%s/.%s_unix" > +static inline const char * > +eal_mp_unix_path(void) > +{ > + static char buffer[PATH_MAX]; /* static so auto-zeroed */ > + const char *directory = default_config_dir; > + const char *home_dir = getenv("HOME"); > + > + if (getuid() != 0 && home_dir != NULL) > + directory = home_dir; > + snprintf(buffer, sizeof(buffer) - 1, MP_UNIX_PATH_FMT, > + directory, internal_config.hugefile_prefix); > + > + return buffer; > + > +} > + > /** Path of hugepage info file. */ > #define HUGEPAGE_INFO_FMT "%s/.%s_hugepage_info" > > diff --git a/lib/librte_eal/common/eal_private.h > b/lib/librte_eal/common/eal_private.h > index 462226f..60944f2 100644 > --- a/lib/librte_eal/common/eal_private.h > +++ b/lib/librte_eal/common/eal_private.h > @@ -224,4 +224,14 @@ int rte_eal_hugepage_attach(void); > */ > struct rte_bus *rte_bus_find_by_device_name(const char *str); > > +/** > + * Create the unix channel for primary/secondary communication. > + * > + * @return > + * 0 on success; > + * (<0) on failure. > + */ > + > +int rte_eal_mp_channel_init(void); > + > #endif /* _EAL_PRIVATE_H_ */ > diff --git a/lib/librte_eal/common/include/rte_eal.h > b/lib/librte_eal/common/include/rte_eal.h > index 8e4e71c..8776bcf 100644 > --- a/lib/librte_eal/common/include/rte_eal.h > +++ b/lib/librte_eal/common/include/rte_eal.h > @@ -215,6 +215,74 @@ int rte_eal_init(int argc, char **argv); > int rte_eal_primary_proc_alive(const char *config_file_path); > > /** > + * Action function typedef used by other components. > + * > + * As we create unix socket channel for primary/secondary communication, use > + * this function typedef to register action for coming messages. > + */ > +typedef int (*rte_eal_mp_t)(const void *params, int len, > + int fds[], int fds_num); > +/** > + * Register an action function for primary/secondary communication. > + * > + * Call this function to register an action, if the calling component wants > + * to response the messages from the corresponding component in its primary > + * process or secondary processes. > + * > + * @param action_name > + * The action_name argument plays as the nonredundant key to find the > action. > + * > + * @param action > + * The action argument is the function pointer to the action function. > + * > + * @return > + * - 0 on success. > + * - (<0) on failure. > + */ > +int rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action); > +/** > + * Unregister an action function for primary/secondary communication. > + * > + * Call this function to unregister an action if the calling component does > + * not want to response the messages from the corresponding component in its > + * primary process or secondary processes. > + * > + * @param action_name > + * The action_name argument plays as the nonredundant key to find the > action. > + * > + */ > +void rte_eal_mp_action_unregister(const char *name); > + > +/** > + * Send a message to the primary process or the secondary processes. > + * > + * This function will send a message which will be responsed by the action > + * identified by action_name of the process on the other side. > + * > + * @param action_name > + * The action_name argument is used to identify which action will be used. > + * > + * @param params > + * The params argument contains the customized message. > + * > + * @param len_params > + * The len_params argument is the length of the customized message. > + * > + * @param fds > + * The fds argument is an array of fds sent with sendmsg. > + * > + * @param fds_num > + * The fds_num argument is number of fds to be sent with sendmsg. > + * > + * @return > + * - (>=0) on success. > + * - (<0) on failure. > + */ > +int > +rte_eal_mp_sendmsg(const char *action_name, const void *params, > + int len_params, int fds[], int fds_num); > + > +/** > * Usage function typedef used by the application usage function. > * > * Use this function typedef to define and call > rte_set_application_usage_hook() > diff --git a/lib/librte_eal/linuxapp/eal/eal.c > b/lib/librte_eal/linuxapp/eal/eal.c > index 229eec9..a84eab4 100644 > --- a/lib/librte_eal/linuxapp/eal/eal.c > +++ b/lib/librte_eal/linuxapp/eal/eal.c > @@ -896,6 +896,15 @@ rte_eal_init(int argc, char **argv) > > eal_check_mem_on_local_socket(); > > + if (rte_eal_mp_channel_init() < 0) { > + rte_eal_init_alert("failed to init mp channel\n"); > + rte_errno = EFAULT; > + return -1; > + } > + > + if (eal_plugins_init() < 0) > + rte_eal_init_alert("Cannot init plugins\n"); > + > eal_thread_init_master(rte_config.master_lcore); > > ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN); > diff --git a/lib/librte_eal/rte_eal_version.map > b/lib/librte_eal/rte_eal_version.map > index f4f46c1..6762397 100644 > --- a/lib/librte_eal/rte_eal_version.map > +++ b/lib/librte_eal/rte_eal_version.map > @@ -235,4 +235,26 @@ EXPERIMENTAL { > rte_service_set_stats_enable; > rte_service_start_with_defaults; > > +} DPDK_17.08; > + > +DPDK_17.11 { > + global: > + > + rte_bus_get_iommu_class; > + rte_eal_iova_mode; > + rte_eal_mbuf_default_mempool_ops; > + rte_lcore_has_role; > + rte_memcpy_ptr; > + rte_pci_get_iommu_class; > + rte_pci_match; > + > +} DPDK_17.08; > + > +DPDK_18.02 { > + global: > + > + rte_eal_mp_action_register; > + rte_eal_mp_action_unregister; > + rte_eal_mp_sendmsg; > + > } DPDK_17.11; > -- > 2.7.4