Hey all

We're trying to nail down some issues with using named pipes

The issue we're getting is deterministic (ENXIO) but it is not this one, but
we think this issue is worth reporting anyway


We're using the branch topic/fifo



The program explained in short is:


- One main (parent) pipe that lives through the whole execution

- The main process forks 'children' child-processes that creates their own
(unique) named pipes

- Each child forks 'children' grans-child-processes that just writes some
bogus messages back to the unique child pipe

- Each child writes a bogus message back to the main process

- Every process creates a write and a read descriptor, but the write
descriptor is just a dummy descriptor (to somehow keep the pipe alive
without being bombarded with signals)

- This iterates a few times


Some of the constructs may be a bit confusing and maybe not relevant to this
issue, but I left them in the test-program anyway 




Issue #1 sometimes occurs in line 35 (printed as 36) we get ENOENT (No such
file or directory) despite that the pipe was just created and the read
descriptor successfully was opened

   *wfd = open(name, O_WRONLY);


Issue #2 sometimes occurs in line 73 (printed as 74) we get EBUSY (Device or
resource busy) when attempting to open a non blocking descriptor

   const int wfd = open(name, O_WRONLY | O_NONBLOCK);


Issue #3 sometimes occurs somewhere unknown and the main process just get
stuck (I've failed to reproduced that with strace or so) and to not have any
more input so maybe this should be left out ?



I hope this is well described and hopefully it's enough to reproduce the
issue(s) and hopefully is not due to a fault test case ;-)



Kristian

#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <stdio.h>
#include <errno.h>
#include <limits.h>

void print_error(const int line, const int error)
{
   printf("%d\tline %d\t%s\n", getpid(), line, strerror(error));
}

#define HANDLE_ERROR(result) do {if (result < 0){print_error(__LINE__, 
errno);return result;}} while (0);

#define HANDLE_RESULT(result) do {if (result < 0){return result;}} while (0);

int make_reader(const char *const name, int *const rfd, int *const wfd)
{
   //printf("%d\tmake\t%s\n", getpid(), name);

   /* In case it exists */
   remove(name);

   const int mode = O_CREAT | S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | 
S_IWOTH;

   const int fifo = mkfifo(name, mode) ? -1 : 0;
   HANDLE_ERROR(fifo);

   *rfd = open(name, O_RDONLY | O_NONBLOCK);
   HANDLE_ERROR(*rfd);

   *wfd = open(name, O_WRONLY);
   HANDLE_ERROR(*wfd);

   return 0;
}

int stop_reader(const char *const name, const int *const rfd, const int *const 
wfd)
{
   //printf("%d\tstop\t%s\n", getpid(), name);

   HANDLE_ERROR(close(*wfd));
   HANDLE_ERROR(close(*rfd));

   HANDLE_ERROR(unlink(name));

   return 0;
}

int wait_child(const int pid)
{
   int status = 0;
   HANDLE_ERROR(waitpid(pid, &status, 0));
   if (WIFEXITED(status))
   {
      //printf("%d\tchild %d exited\n", getpid(), pid);
   }
   return 0;
}

int send_messages(const char *const name, const int messages)
{
   //printf("%d\tsend\t%s\n", getpid(), name);

   char message[PIPE_BUF];
   memset(message, ' ', sizeof message);

   for (int index = 0; index < messages; ++index)
   {
      const int wfd = open(name, O_WRONLY | O_NONBLOCK);
      HANDLE_ERROR(wfd);
      if (::write(wfd, message, sizeof message) == -1 && errno == EAGAIN)
      {
         //printf("%d\tsend\t%s\t[%d]\n", getpid(), name, index);
         HANDLE_ERROR(fcntl(wfd, F_SETFL, ~O_NONBLOCK & fcntl(wfd, F_GETFL)));
         HANDLE_ERROR(::write(wfd, message, sizeof message));
      }
      HANDLE_ERROR(close(wfd));
   }

   return 0;
}

int read_messages(const char *const name, const int rfd, const int messages)
{
   //printf("%d\tread\t%s\n", getpid(), name);

   char message[PIPE_BUF] = {};

   for (int index = 0; index < messages; ++index)
   {
      HANDLE_ERROR(fcntl(rfd, F_SETFL, O_NONBLOCK | fcntl(rfd, F_GETFL)));
      if (::read(rfd, message, sizeof message) == -1 && errno == EAGAIN)
      {
         //printf("%d\tread\t%s\t[%d]\n", getpid(), name, index);
         HANDLE_ERROR(fcntl(rfd, F_SETFL, ~O_NONBLOCK & fcntl(rfd, F_GETFL)));
         HANDLE_ERROR(::read(rfd, &message, sizeof(message)));
      }
   }

   return 0;
}

int main()
{
   const int messages = 5;
   const int children = 25;

   const char *name_parent = "/tmp/pipe_parent";

   int rfd, wfd;
   HANDLE_RESULT(make_reader(name_parent, &rfd, &wfd));

   for (int lap = 1; lap < 5; ++lap)
   {
      printf("lap %d\n", lap);

      int first_generation_pids[children];

      for (int idx = 0; idx < children; ++idx)
      {
         const int first_generation_pid = fork();
         HANDLE_ERROR(first_generation_pid);

         if (first_generation_pid == 0)
         {
            char name_child[32];
            snprintf(name_child, sizeof name_child, "/tmp/pipe_child_%d", idx);
            int rfd, wfd;
            HANDLE_RESULT(make_reader(name_child, &rfd, &wfd));

            int second_generation_pids[children];

            for (int idx = 0; idx < children; ++idx)
            {
               const int second_generation_pid = fork();
               HANDLE_ERROR(second_generation_pid);

               if (second_generation_pid == 0)
               {
                  return send_messages(name_child, messages);
               }

               second_generation_pids[idx] = second_generation_pid;
            }

            HANDLE_RESULT(read_messages(name_child, rfd, children * messages));

            for (int idx = 0; idx < children; ++idx)
            {
               wait_child(second_generation_pids[idx]);
            }

            HANDLE_RESULT(send_messages(name_parent, messages));

            HANDLE_RESULT(stop_reader(name_child, &rfd, &wfd));

            return 0;
         }

         first_generation_pids[idx] = first_generation_pid;
      }

      HANDLE_RESULT(read_messages(name_parent, rfd, children * messages));

      for (int idx = 0; idx < children; ++idx)
      {
         wait_child(first_generation_pids[idx]);
      }
   }

   HANDLE_RESULT(stop_reader(name_parent, &rfd, &wfd));

   return 0;
}
--
Problem reports:      https://cygwin.com/problems.html
FAQ:                  https://cygwin.com/faq/
Documentation:        https://cygwin.com/docs.html
Unsubscribe info:     https://cygwin.com/ml/#unsubscribe-simple

Reply via email to