I wrote a sample variant of hiperfifo written in C++ and using libevent2
which I would like to submit.
I think it is nicely structured and easier to undersand than the C version.
-- Beppe Attardi
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) 1998 - 2011, Daniel Stenberg, <[email protected]>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at http://curl.haxx.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
***************************************************************************/
/* Example application source code using the multi socket interface to
download many files at once.
Written by Jeff Pohlmeyer
Ported to C++ by Giuseppe Attardi.
Requires libevent and a (POSIX?) system that has mkfifo().
This is an adaptation of libcurl's "hipev.c" and libevent's "event-test.c"
sample programs.
When running, the program creates the named pipe "hiper.fifo"
Whenever there is input into the fifo, the program reads the input as a list
of URL's and creates some new easy handles to fetch each URL via the
curl_multi "hiper" API.
Thus, you can try a single URL:
% echo http://www.yahoo.com > hiper.fifo
Or a whole bunch of them:
% cat my-url-list > hiper.fifo
The fifo buffer is handled almost instantly, so you can even add more URL's
while the previous requests are still being downloaded.
This is purely a demo app, all retrieved data is simply discarded by the write
callback.
*/
#include <string.h> // strdup()
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h> // unlink()
#include <curl/curl.h>
#include <event.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
static void event_cb(int fd, short kind, void *userp);
static int prog_cb(void *p, double dltotal, double dlnow, double ult, double
uln);
static size_t write_cb(void* ptr, size_t size, size_t nmemb, void* data);
static int sock_cb(CURL* e, curl_socket_t s, int what, void* cbp, void* sockp);
static int multi_timer_cb(CURLM* multi, long timeout_ms, void* data);
static void check_mcode(const char *where, CURLMcode code);
/* Retriever, dealing with multiple connections */
struct Retriever
{
Retriever() :
multi(curl_multi_init()),
still_running(0)
{
/* setup the generic multi interface options we want */
curl_multi_setopt(multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
curl_multi_setopt(multi, CURLMOPT_SOCKETDATA, this);
curl_multi_setopt(multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
curl_multi_setopt(multi, CURLMOPT_TIMERDATA, this);
}
~Retriever() {
curl_multi_cleanup(multi);
}
/* Check for completed transfers, and remove their easy handles */
void check_completion();
event timer_event;
CURLM* multi;
int still_running;
};
/* Information associated with a specific easy handle */
struct Connection
{
/* Create an easy handle, and add it to the curl_multi of the Retriever */
Connection(char const* url, Retriever* retriever) :
easy(curl_easy_init()),
url(strdup(url)),
action(0),
evset(false)
{
error[0] = '\0';
if (!easy) {
fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
exit(2);
}
curl_easy_setopt(easy, CURLOPT_URL, this->url);
curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(easy, CURLOPT_WRITEDATA, this);
curl_easy_setopt(easy, CURLOPT_VERBOSE, 1L);
curl_easy_setopt(easy, CURLOPT_ERRORBUFFER, error);
curl_easy_setopt(easy, CURLOPT_PRIVATE, this);
curl_easy_setopt(easy, CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
curl_easy_setopt(easy, CURLOPT_PROGRESSDATA, this);
CURLMcode rc = curl_multi_add_handle(retriever->multi, easy);
check_mcode("new Connection: curl_multi_add_handle", rc);
/* note that the add_handle() will set a time-out to trigger very soon so
that the necessary socket_action() call will be called by this app */
}
~Connection() { free(url); }
/* Activate the event to monitor the socket for this connection */
void setevent(curl_socket_t sockfd, int act, Retriever* retriever) {
int kind =
(act & CURL_POLL_IN ? EV_READ : 0) |
(act & CURL_POLL_OUT ? EV_WRITE : 0) | EV_PERSIST;
action = act;
if (evset)
event_del(&ev);
event_set(&ev, sockfd, kind, event_cb, retriever);
evset = true;
event_add(&ev, NULL);
}
CURL* easy;
char* url;
char error[CURL_ERROR_SIZE];
int action;
event ev; // event monitoring connection
bool evset;
};
void Retriever::check_completion()
{
fprintf(MSG_OUT, "REMAINING: %d\n", still_running);
CURLMsg* msg;
int msgs_left;
while ((msg = curl_multi_info_read(multi, &msgs_left))) {
if (msg->msg == CURLMSG_DONE) {
CURL* easy = msg->easy_handle;
CURLcode res = msg->data.result;
Connection* conn;
curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
char *eff_url;
curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
curl_multi_remove_handle(multi, easy);
curl_easy_cleanup(easy);
delete conn;
}
}
}
/* Update the event timer after curl_multi library calls */
static int multi_timer_cb(CURLM* multi, long timeout_ms, void* data)
{
struct timeval timeout;
Retriever* retr = (Retriever*)data;
(void)multi; /* unused */
timeout.tv_sec = timeout_ms / 1000;
timeout.tv_usec = (timeout_ms % 1000) * 1000;
fprintf(MSG_OUT, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
evtimer_add(&retr->timer_event, &timeout);
return 0;
}
/* Die if we get a bad CURLMcode somewhere */
static void check_mcode(const char* where, CURLMcode code)
{
if (CURLM_OK != code) {
const char* s = curl_multi_strerror(code);
fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
if (CURLM_BAD_SOCKET != code) // ignore this error
exit(code);
}
}
/* Called by libevent when we get action on a multi socket */
static void event_cb(int fd, short kind, void* userp)
{
Retriever* retr = (Retriever*)userp;
int action =
(kind & EV_READ ? CURL_CSELECT_IN : 0) |
(kind & EV_WRITE ? CURL_CSELECT_OUT : 0);
CURLMcode rc = curl_multi_socket_action(retr->multi, fd, action,
&retr->still_running);
check_mcode("event_cb: curl_multi_socket_action", rc);
retr->check_completion();
if (retr->still_running <= 0) {
fprintf(MSG_OUT, "last transfer done, kill timeout\n");
if (evtimer_pending(&retr->timer_event, NULL))
evtimer_del(&retr->timer_event);
}
}
/* Called by libevent when our timeout expires */
static void timer_cb(int fd, short kind, void* userp)
{
Retriever* retr = (Retriever*)userp;
(void)fd;
(void)kind;
// set second argument to CURL_SOCKET_TIMEOUT
// (@see http://curl.haxx.se/libcurl/c/curl_multi_socket_action.html)
CURLMcode rc = curl_multi_socket_action(retr->multi,
CURL_SOCKET_TIMEOUT, 0,
&retr->still_running);
check_mcode("timer_cb: curl_multi_socket_action", rc);
retr->check_completion();
}
/* CURLMOPT_SOCKETFUNCTION */
static int sock_cb(CURL* e, curl_socket_t s, int what, void* cbp, void* sockp)
{
Retriever* retr = (Retriever*)cbp;
static const char *whatstr[] = { "none", "IN", "OUT", "INOUT", "REMOVE" };
fprintf(MSG_OUT,
"socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
Connection* conn;
curl_easy_getinfo(e, CURLINFO_PRIVATE, &conn);
if (what == CURL_POLL_REMOVE) {
fprintf(MSG_OUT, "\n");
if (conn->evset)
event_del(&conn->ev);
} else {
if (!conn->evset)
// first call
fprintf(MSG_OUT, "Sending data: %s\n", whatstr[what]);
else
fprintf(MSG_OUT,
"Changing action from %s to %s\n",
whatstr[conn->action], whatstr[what]);
conn->setevent(s, what, retr);
}
return 0;
}
/* CURLOPT_WRITEFUNCTION */
static size_t write_cb(void* ptr, size_t size, size_t nmemb, void* data)
{
size_t realsize = size * nmemb;
Connection *conn = (Connection*) data;
(void)ptr;
(void)conn;
return realsize;
}
/* CURLOPT_PROGRESSFUNCTION */
static int prog_cb(void* p, double dltotal, double dlnow, double ult,
double uln)
{
Connection *conn = (Connection *)p;
(void)ult;
(void)uln;
fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
return 0;
}
/* This gets called whenever data is received from the fifo */
static void fifo_cb(int fd, short event, void* arg)
{
Retriever* r = (Retriever*)arg;
(void)event; /* unused */
FILE* fp = fdopen(fd, "r");
char *line = NULL;
size_t len = 0;
ssize_t read;
while ((read = getline(&line, &len, fp)) != -1) {
line[read-1] = '\0'; // trim nl
/* if we read a URL, go get it! */
Connection* conn = new Connection(line, r);
fprintf(MSG_OUT,
"Adding easy %p to multi %p (%s)\n", conn->easy, r->multi, line);
}
free(line);
}
/* Create a named pipe */
static int init_fifo()
{
struct stat st;
static const char *fifo = "hiper.fifo";
fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
if (lstat(fifo, &st) == 0) {
if ((st.st_mode & S_IFMT) == S_IFREG) {
errno = EEXIST;
perror("lstat");
return -1;
}
}
unlink(fifo);
if (mkfifo(fifo, 0600) == -1) {
perror("mkfifo");
return -1;
}
int sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
if (sockfd == -1) {
perror("open");
return -1;
}
fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
return sockfd;
}
int main(int argc, char **argv)
{
Retriever retr;
(void)argc;
(void)argv;
event_init();
evtimer_set(&retr.timer_event, timer_cb, &retr);
/* create fifo */
int sockfd = init_fifo();
if (sockfd == -1)
return -1;
/* tell libevent to monitor it */
event fifo_event;
event_set(&fifo_event, sockfd, EV_READ | EV_PERSIST, fifo_cb, &retr);
event_add(&fifo_event, NULL);
/* we don't call any curl_multi_socket*() function yet as we have no handles
added! */
event_dispatch();
return 0;
}
-------------------------------------------------------------------
List admin: http://cool.haxx.se/list/listinfo/curl-library
Etiquette: http://curl.haxx.se/mail/etiquette.html