I wrote a sample variant of hiperfifo written in C++ and using libevent2 which I would like to submit.
I think it is nicely structured and easier to undersand than the C version.

-- Beppe Attardi

/***************************************************************************
 *                                  _   _ ____  _
 *  Project                     ___| | | |  _ \| |
 *                             / __| | | | |_) | |
 *                            | (__| |_| |  _ <| |___
 *                             \___|\___/|_| \_\_____|
 *
 * Copyright (C) 1998 - 2011, Daniel Stenberg, <[email protected]>, et al.
 *
 * This software is licensed as described in the file COPYING, which
 * you should have received as part of this distribution. The terms
 * are also available at http://curl.haxx.se/docs/copyright.html.
 *
 * You may opt to use, copy, modify, merge, publish, distribute and/or sell
 * copies of the Software, and permit persons to whom the Software is
 * furnished to do so, under the terms of the COPYING file.
 *
 * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
 * KIND, either express or implied.
 *
 ***************************************************************************/
/* Example application source code using the multi socket interface to
   download many files at once.

Written by Jeff Pohlmeyer
Ported to C++ by Giuseppe Attardi.

Requires libevent and a (POSIX?) system that has mkfifo().

This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
sample programs.

When running, the program creates the named pipe "hiper.fifo"

Whenever there is input into the fifo, the program reads the input as a list
of URL's and creates some new easy handles to fetch each URL via the
curl_multi "hiper" API.


Thus, you can try a single URL:
  % echo http://www.yahoo.com > hiper.fifo

Or a whole bunch of them:
  % cat my-url-list > hiper.fifo

The fifo buffer is handled almost instantly, so you can even add more URL's
while the previous requests are still being downloaded.

This is purely a demo app, all retrieved data is simply discarded by the write
callback.

*/

#include <string.h>             // strdup()
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>             // unlink()
#include <curl/curl.h>
#include <event.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>


#define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */

static void event_cb(int fd, short kind, void *userp);
static int prog_cb(void *p, double dltotal, double dlnow, double ult, double 
uln);
static size_t write_cb(void* ptr, size_t size, size_t nmemb, void* data);
static int sock_cb(CURL* e, curl_socket_t s, int what, void* cbp, void* sockp);
static int multi_timer_cb(CURLM* multi, long timeout_ms, void* data);

static void check_mcode(const char *where, CURLMcode code);

/* Retriever, dealing with multiple connections */
struct Retriever
{
  Retriever() :
    multi(curl_multi_init()),
    still_running(0)
  {
    /* setup the generic multi interface options we want */
    curl_multi_setopt(multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
    curl_multi_setopt(multi, CURLMOPT_SOCKETDATA, this);
    curl_multi_setopt(multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
    curl_multi_setopt(multi, CURLMOPT_TIMERDATA, this);
  }

  ~Retriever() {
    curl_multi_cleanup(multi);
  }


  /* Check for completed transfers, and remove their easy handles */
  void check_completion();

  event timer_event;
  CURLM* multi;
  int   still_running;
};


/* Information associated with a specific easy handle */
struct Connection
{
  /* Create an easy handle, and add it to the curl_multi of the Retriever */
  Connection(char const* url, Retriever* retriever) :
    easy(curl_easy_init()),
    url(strdup(url)),
    action(0),
    evset(false)
  {
    error[0] = '\0';

    if (!easy) {
      fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
      exit(2);
    }
    curl_easy_setopt(easy, CURLOPT_URL, this->url);
    curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, write_cb);
    curl_easy_setopt(easy, CURLOPT_WRITEDATA, this);
    curl_easy_setopt(easy, CURLOPT_VERBOSE, 1L);
    curl_easy_setopt(easy, CURLOPT_ERRORBUFFER, error);
    curl_easy_setopt(easy, CURLOPT_PRIVATE, this);
    curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 0L);
    curl_easy_setopt(easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
    curl_easy_setopt(easy, CURLOPT_PROGRESSDATA, this);

    CURLMcode rc = curl_multi_add_handle(retriever->multi, easy);
    check_mcode("new Connection: curl_multi_add_handle", rc);

    /* note that the add_handle() will set a time-out to trigger very soon so
       that the necessary socket_action() call will be called by this app */
  }

  ~Connection() { free(url); }

  /* Activate the event to monitor the socket for this connection */
  void setevent(curl_socket_t sockfd, int act, Retriever* retriever) {
    int kind =
      (act & CURL_POLL_IN ? EV_READ : 0) |
      (act & CURL_POLL_OUT ? EV_WRITE : 0) | EV_PERSIST;

    action = act;
    if (evset)
      event_del(&ev);
    event_set(&ev, sockfd, kind, event_cb, retriever);
    evset = true;
    event_add(&ev, NULL);
  }

  CURL*         easy;
  char*         url;
  char          error[CURL_ERROR_SIZE];
  int           action;
  event         ev;             // event monitoring connection
  bool          evset;
};

void Retriever::check_completion()
{
  fprintf(MSG_OUT, "REMAINING: %d\n", still_running);
  CURLMsg* msg;
  int msgs_left;

  while ((msg = curl_multi_info_read(multi, &msgs_left))) {
    if (msg->msg == CURLMSG_DONE) {
      CURL* easy = msg->easy_handle;
      CURLcode res = msg->data.result;
      Connection* conn;
      curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
      char *eff_url;
      curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
      fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
      curl_multi_remove_handle(multi, easy);
      curl_easy_cleanup(easy);
      delete conn;
    }
  }
}


/* Update the event timer after curl_multi library calls */
static int multi_timer_cb(CURLM* multi, long timeout_ms, void* data)
{
  struct timeval timeout;
  Retriever* retr = (Retriever*)data;
  (void)multi; /* unused */

  timeout.tv_sec = timeout_ms / 1000;
  timeout.tv_usec = (timeout_ms % 1000) * 1000;
  fprintf(MSG_OUT, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
  evtimer_add(&retr->timer_event, &timeout);
  return 0;
}

/* Die if we get a bad CURLMcode somewhere */
static void check_mcode(const char* where, CURLMcode code)
{
  if (CURLM_OK != code) {
    const char* s = curl_multi_strerror(code);
    fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
    if (CURLM_BAD_SOCKET != code)      // ignore this error
      exit(code);
  }
}


/* Called by libevent when we get action on a multi socket */
static void event_cb(int fd, short kind, void* userp)
{
  Retriever* retr = (Retriever*)userp;

  int action =
    (kind & EV_READ ? CURL_CSELECT_IN : 0) |
    (kind & EV_WRITE ? CURL_CSELECT_OUT : 0);

  CURLMcode rc = curl_multi_socket_action(retr->multi, fd, action, 
&retr->still_running);
  check_mcode("event_cb: curl_multi_socket_action", rc);

  retr->check_completion();
  if (retr->still_running <= 0) {
    fprintf(MSG_OUT, "last transfer done, kill timeout\n");
    if (evtimer_pending(&retr->timer_event, NULL))
      evtimer_del(&retr->timer_event);
  }
}


/* Called by libevent when our timeout expires */
static void timer_cb(int fd, short kind, void* userp)
{
  Retriever* retr = (Retriever*)userp;
  (void)fd;
  (void)kind;

  // set second argument to CURL_SOCKET_TIMEOUT
  // (@see http://curl.haxx.se/libcurl/c/curl_multi_socket_action.html)
  CURLMcode rc = curl_multi_socket_action(retr->multi,
                                          CURL_SOCKET_TIMEOUT, 0, 
&retr->still_running);
  check_mcode("timer_cb: curl_multi_socket_action", rc);
  retr->check_completion();
}


/* CURLMOPT_SOCKETFUNCTION */
static int sock_cb(CURL* e, curl_socket_t s, int what, void* cbp, void* sockp)
{
  Retriever* retr = (Retriever*)cbp;
  static const char *whatstr[] = { "none", "IN", "OUT", "INOUT", "REMOVE" };

  fprintf(MSG_OUT,
          "socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);

  Connection* conn;
  curl_easy_getinfo(e, CURLINFO_PRIVATE, &conn);
  if (what == CURL_POLL_REMOVE) {
    fprintf(MSG_OUT, "\n");
    if (conn->evset)
      event_del(&conn->ev);
  } else {
    if (!conn->evset)
      // first call
      fprintf(MSG_OUT, "Sending data: %s\n", whatstr[what]);
    else
      fprintf(MSG_OUT,
              "Changing action from %s to %s\n",
              whatstr[conn->action], whatstr[what]);
    conn->setevent(s, what, retr);
  }
  return 0;
}


/* CURLOPT_WRITEFUNCTION */
static size_t write_cb(void* ptr, size_t size, size_t nmemb, void* data)
{
  size_t realsize = size * nmemb;
  Connection *conn = (Connection*) data;
  (void)ptr;
  (void)conn;
  return realsize;
}


/* CURLOPT_PROGRESSFUNCTION */
static int prog_cb(void* p, double dltotal, double dlnow, double ult,
                   double uln)
{
  Connection *conn = (Connection *)p;
  (void)ult;
  (void)uln;

  fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
  return 0;
}


/* This gets called whenever data is received from the fifo */
static void fifo_cb(int fd, short event, void* arg)
{
  Retriever* r = (Retriever*)arg;
  (void)event; /* unused */

  FILE* fp = fdopen(fd, "r");
  char *line = NULL;
  size_t len = 0;
  ssize_t read;
  while ((read = getline(&line, &len, fp)) != -1) {
    line[read-1] = '\0';        // trim nl
    /* if we read a URL, go get it! */
    Connection* conn = new Connection(line, r);
    fprintf(MSG_OUT,
            "Adding easy %p to multi %p (%s)\n", conn->easy, r->multi, line);
  }
  free(line);
}

/* Create a named pipe */
static int init_fifo()
{
  struct stat st;
  static const char *fifo = "hiper.fifo";

  fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
  if (lstat(fifo, &st) == 0) {
    if ((st.st_mode & S_IFMT) == S_IFREG) {
      errno = EEXIST;
      perror("lstat");
      return -1;
    }
  }
  unlink(fifo);
  if (mkfifo(fifo, 0600) == -1) {
    perror("mkfifo");
    return -1;
  }
  int sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
  if (sockfd == -1) {
    perror("open");
    return -1;
  }
  fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
  return sockfd;
}

int main(int argc, char **argv)
{
  Retriever retr;
  (void)argc;
  (void)argv;

  event_init();

  evtimer_set(&retr.timer_event, timer_cb, &retr);

  /* create fifo */
  int sockfd = init_fifo();
  if (sockfd == -1)
    return -1;
  /* tell libevent to monitor it */
  event fifo_event;
  event_set(&fifo_event, sockfd, EV_READ | EV_PERSIST, fifo_cb, &retr);
  event_add(&fifo_event, NULL);

  /* we don't call any curl_multi_socket*() function yet as we have no handles
     added! */

  event_dispatch();
  return 0;
}
-------------------------------------------------------------------
List admin: http://cool.haxx.se/list/listinfo/curl-library
Etiquette:  http://curl.haxx.se/mail/etiquette.html

Reply via email to