On 30-Nov-17 6:44 PM, Jianfeng Tan wrote:
Previouly, there are three channels for multi-process
(i.e., primary/secondary) communication.
   1. Config-file based channel, in which, the primary process writes
      info into a pre-defined config file, and the secondary process
      reads info out.
   2. vfio submodule has its own channel based on unix socket for the
      secondary process to get container fd and group fd from the
      primary process.
   3. pdump submodule also has its own channel based on unix socket for
      packet dump.

It'll be good to have a generic communication channel for multi-process
communication to accomodate the requirements including:
   a. Secondary wants to send info to primary, for example, secondary
      would like to send request (about some specific vdev to primary).
   b. Sending info at any time, instead of just initialization time.
   c. Share FDs with the other side, for vdev like vhost, related FDs
      (memory region, kick) should be shared.
   d. A send message request needs the other side to response immediately.

This patch proposes to create a communication channel, as an unix
socket connection, for above requirements. Primary will listen on
the unix socket; secondary will connect this socket to talk.

Three new APIs are added:

   1. rte_eal_mp_action_register is used to register an action,
      indexed by a string; if the calling component wants to
      response the messages from the corresponding component in
      its primary process or secondary processes.
   2. rte_eal_mp_action_unregister is used to unregister the action
      if the calling component does not want to response the messages.
   3. rte_eal_mp_sendmsg is used to send a message.

Signed-off-by: Jianfeng Tan <jianfeng....@intel.com>
---

<...snip...>

+
+int
+rte_eal_mp_action_register(const char *action_name, rte_eal_mp_t action)
+{
+       struct action_entry *entry = malloc(sizeof(struct action_entry));
+
+       if (entry == NULL)
+               return -ENOMEM;
+
+       if (find_action_entry_by_name(action_name) != NULL)
+               return -EEXIST;

This should probably do a free(entry).

+
+       strncpy(entry->action_name, action_name, MAX_ACTION_NAME_LEN);
+       entry->action = action;
+       TAILQ_INSERT_TAIL(&action_entry_list, entry, next);
+       return 0;
+}
+

<...snip...>

+
+static int
+add_secondary(void)
+{
+       int fd;
+       struct epoll_event ev;
+
+       while (1) {
+               fd = accept(mp_fds.listen, NULL, NULL);
+               if (fd < 0 && errno == EAGAIN)
+                       break;
+               else if (fd < 0) {
+                       RTE_LOG(ERR, EAL, "primary failed to accept: %s\n",
+                               strerror(errno));
+                       return -1;
+               }
+
+               ev.events = EPOLLIN | EPOLLRDHUP;
+               ev.data.fd = fd;
+               if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, fd, &ev) < 0) {
+                       RTE_LOG(ERR, EAL, "failed to add secondary: %s\n",
+                               strerror(errno));
+                       break;
+               }
+               if (add_sec_proc(fd) < 0) {
+                       RTE_LOG(ERR, EAL, "too many secondary processes\n");
+                       close(fd);
+                       break;
+               }
+       }
+
+       return 0;
+}
+
+static void *
+mp_handler(void *arg __rte_unused)
+{
+       int fd;
+       int i, n;
+       struct epoll_event ev;
+       struct epoll_event *events;
+       int is_primary = rte_eal_process_type() == RTE_PROC_PRIMARY;
+
+       ev.events = EPOLLIN | EPOLLRDHUP;
+       ev.data.fd = (is_primary) ? mp_fds.listen : mp_fds.primary;
+       if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) {
+               RTE_LOG(ERR, EAL, "failed to epoll_ctl: %s\n",
+                       strerror(errno));
+               exit(EXIT_FAILURE);

rte_exit?

+       }
+
+       events = calloc(20, sizeof ev);
+
+       while (1) {
+               n = epoll_wait(mp_fds.efd, events, 20, -1);
+               for (i = 0; i < n; i++) {
+                       if (is_primary && events[i].data.fd == mp_fds.listen) {
+                               if (events[i].events != EPOLLIN) {
+                                       RTE_LOG(ERR, EAL, "what happens?\n");

More descriptive error message would be nice :)

+                                       exit(EXIT_FAILURE);

rte_exit?

+                               }
+
+                               if (add_secondary() < 0)
+                                       break;

Doing epoll_ctl in multiple different places hurts readability IMO. Might be a good idea to refactor add_secondary and mp_handler in a way that keeps all epoll handling in one place.

+
+                               continue;
+                       }
+
+                       fd = events[i].data.fd;
+
+                       if ((events[i].events & EPOLLIN)) {
+                               if (process_msg(fd) < 0) {
+                                       RTE_LOG(ERR, EAL,
+                                               "failed to process msg\n");
+                                       if (!is_primary)
+                                               exit(EXIT_FAILURE);

rte_exit()?

+                               }
+                               continue;
+                       }
+
+                       /* EPOLLERR, EPOLLHUP, etc */
+                       if (is_primary) {
+                               RTE_LOG(ERR, EAL, "secondary exit: %d\n", fd);
+                               epoll_ctl(mp_fds.efd, EPOLL_CTL_DEL, fd, NULL);
+                               del_sec_proc(fd);
+                               close(fd);
+                       } else {
+                               RTE_LOG(ERR, EAL, "primary exits, so do I\n");
+                               /* Exit secondary when primary exits? */
+                               exit(EXIT_FAILURE);

This is changing previous behavior. I don't think exiting secondary when primary exits is something we want to do, so i would just print an error, but not exit the process.

+                       }
+               }
+       }
+
+       return NULL;
+}
+
+int
+rte_eal_mp_channel_init(void)
+{
+       int i, fd, ret;
+       const char *path;
+       struct sockaddr_un un;
+       pthread_t tid;
+       char thread_name[RTE_MAX_THREAD_NAME_LEN];
+
+       mp_fds.efd = epoll_create1(0);
+       if (mp_fds.efd < 0) {
+               RTE_LOG(ERR, EAL, "epoll_create1 failed\n");
+               return -1;
+       }
+
+       fd = socket(AF_UNIX, SOCK_STREAM, 0);
+       if (fd < 0) {
+               RTE_LOG(ERR, EAL, "Failed to create unix socket\n");
+               return -1;
+       }
+
+       memset(&un, 0, sizeof(un));
+       un.sun_family = AF_UNIX;
+       path = eal_mp_unix_path();
+       strncpy(un.sun_path, path, sizeof(un.sun_path));
+       un.sun_path[sizeof(un.sun_path) - 1] = '\0';
+
+       if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+               for (i = 0; i < MAX_SECONDARY_PROCS; ++i)
+                       mp_fds.secondaries[i] = -1;
+
+               if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) {
+                       RTE_LOG(ERR, EAL, "cannot set nonblocking mode\n");
+                       close(fd);
+                       return -1;
+               }
+
+               /* The file still exists since last run */
+               unlink(path);
+
+               ret = bind(fd, (struct sockaddr *)&un, sizeof(un));
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to bind to %s: %s\n",
+                               path, strerror(errno));
+                       close(fd);
+                       return -1;
+               }
+               RTE_LOG(INFO, EAL, "primary bind to %s\n", path);
+
+               ret = listen(fd, 1024);
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to listen: %s\n",
+                               strerror(errno));
+                       close(fd);
+                       return -1;
+               }
+               mp_fds.listen = fd;
+       } else {
+               ret = connect(fd, (struct sockaddr *)&un, sizeof(un));
+               if (ret < 0) {
+                       RTE_LOG(ERR, EAL, "failed to connect primary\n");
+                       return -1;

Do we want to prevent secondary from launching if it can't connect to primary? Some use cases might rely on previous behavior. Maybe instead add some checks in handling functions to ensure that we have a valid connection to the primary before doing anything?

+               }
+               mp_fds.primary = fd;
+       }
+
+       ret = pthread_create(&tid, NULL, mp_handler, NULL);
+       if (ret < 0) {
+               RTE_LOG(ERR, EAL, "failed to create thead: %s\n",
+                       strerror(errno));
+               close(fd);
+               close(mp_fds.efd);
+               return -1;
+       }

<...snip...>

+       if (fds_num > SCM_MAX_FD) {
+               RTE_LOG(ERR, EAL,
+                       "Cannot send more than %d FDs\n", SCM_MAX_FD);
+               return -E2BIG;
+       }
+
+       len_msg = sizeof(struct msg_hdr) + len_params;
+       if (len_msg > MAX_MESSAGE_LENGTH) {
+               RTE_LOG(ERR, EAL, "Message is too long\n");
+               return -ENOMEM;

Nitpicking, but is this really -ENOMEM? Shouldn't this be -EINVAL or -E2BIG? Also, this is external API - maybe return -1 and set rte_errno?

+       }
+
+       RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg);

Do we want this as INFO, not DEBUG?

+
+       msg = malloc(len_msg);
+       if (!msg) {
+               RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n");
+               return -ENOMEM;
+       }

<...snip...>

/**
+ * Action function typedef used by other components.
+ *
+ * As we create unix socket channel for primary/secondary communication, use
+ * this function typedef to register action for coming messages.
+ */
+typedef int (*rte_eal_mp_t)(const void *params, int len,
+                           int fds[], int fds_num);

Nitpicking, but probably needs newlines before comments, here and after next function definition.

+/**
+ * Register an action function for primary/secondary communication.
+ *
+ * Call this function to register an action, if the calling component wants
+ * to response the messages from the corresponding component in its primary
+ * process or secondary processes.
+ *
+ * @param action_name
+ *   The action_name argument plays as the nonredundant key to find the action.
+ *
+ * @param action
+ *   The action argument is the function pointer to the action function.
+ *
+ * @return
+ *  - 0 on success.
+ *  - (<0) on failure.
+ */

<...snip...>

diff --git a/lib/librte_eal/linuxapp/eal/eal.c 
b/lib/librte_eal/linuxapp/eal/eal.c
index 229eec9..a84eab4 100644
--- a/lib/librte_eal/linuxapp/eal/eal.c
+++ b/lib/librte_eal/linuxapp/eal/eal.c
@@ -896,6 +896,15 @@ rte_eal_init(int argc, char **argv)
eal_check_mem_on_local_socket(); + if (rte_eal_mp_channel_init() < 0) {
+               rte_eal_init_alert("failed to init mp channel\n");
+               rte_errno = EFAULT;
+               return -1;
+       }

As noted above, maybe only fail if it's primary process?

+
+       if (eal_plugins_init() < 0)
+               rte_eal_init_alert("Cannot init plugins\n");

This is probably a leftover of some other patch?

+
        eal_thread_init_master(rte_config.master_lcore);
ret = eal_thread_dump_affinity(cpuset, RTE_CPU_AFFINITY_STR_LEN);
diff --git a/lib/librte_eal/rte_eal_version.map 
b/lib/librte_eal/rte_eal_version.map
index f4f46c1..6762397 100644
--- a/lib/librte_eal/rte_eal_version.map
+++ b/lib/librte_eal/rte_eal_version.map
@@ -235,4 +235,26 @@ EXPERIMENTAL {
        rte_service_set_stats_enable;
        rte_service_start_with_defaults;
+} DPDK_17.08;
+
+DPDK_17.11 {
+       global:
+
+       rte_bus_get_iommu_class;
+       rte_eal_iova_mode;
+       rte_eal_mbuf_default_mempool_ops;
+       rte_lcore_has_role;
+       rte_memcpy_ptr;
+       rte_pci_get_iommu_class;
+       rte_pci_match;
+
+} DPDK_17.08;
+

Same here, this looks like leftovers of rebase.

+DPDK_18.02 {
+       global:
+
+       rte_eal_mp_action_register;
+       rte_eal_mp_action_unregister;
+       rte_eal_mp_sendmsg;
+
  } DPDK_17.11;



--
Thanks,
Anatoly

Reply via email to