Am Tuesday 10 April 2012 schrieb Micah Cowan:
> > Well, since there is no response to my previous post: is there any
> > interest in getting that done anyway ?
>
> There's interest, sure enough. But this concurrency stuff was meant to
> be a Google Summer of Code project, so someone already getting started
> on (and completing a proof-of-concept for) these things leaves us in a
> bit of a weird place with regard to the current Summer of Code
> applicants we're sifting through.
Ups. Didn't think about that.
> But perhaps you can post what you've done so far, and we can take a look
> at what there is, and what remains, and whether a summer-of-code student
> could adapt their needed work to fill in the gaps...
I stripped off some already begun stuff and added some comments. So now the
code might be used as a kind of workbench for testing concurrency and/or
metalink stuff.
The code is written from scratch.
I put it under CC0 License (GNU compatible), so everybody can do what
she/he/it wants with it.
> As to HTTP header stuff... I had a hunch before that no one is using it
> much in practice, especially since it's a newer spec. But I'd imagine
> metalinker.org might; or if not, someone there could probably point you
> at a test server somewhere, or something.
Let the students care about that ;-)
If I can help out with something regarding the project, drop me a mail.
Tim
/*
* (c)2012 Tim Ruehsen
*
* Source Code License
* CC0 1.0 Universal (CC0 1.0) Public Domain Dedication
* http://creativecommons.org/publicdomain/zero/1.0/legalcode
*
* proof of concept for a threaded concurrent download design
*
* Changelog
* 06.04.2012 Tim Ruehsen created
*
* Hints:
* The code is complete written from scratch and can be used as
* a starting point.
* Downloads are just 'simulated' by a random sleep.
* Job queue and blacklist are implemented as simple as possible (using lists).
* There is no metalink code, no network code, no html parsing, no IRI/URI parsing.
* All that should be done as a Google Summer of Code project.
*
* Network, parsing, URI code is available within the wget code.
* Metalink code has to be written (see RFC 5854 / 6249)
*
* Once everything is implemented and working, the code should be
* integrated into wget.
*
* Compile and link with
* gcc -Wall -Wextra -O2 wget_proto.c -o wget_proto -lpthread
*
*/
#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <ctype.h>
#include <sys/types.h>
#include <sys/socket.h>
#define NUM_DOWNLOADER 3
#define xfree(a) do { free((void *)(a)); a=NULL; } while (0);
typedef struct LISTNODE LISTNODE;
struct LISTNODE {
LISTNODE
*next,
*prev;
};
static LISTNODE *list_add(LISTNODE *list, LISTNODE *node)
{
if (!node)
node=malloc(sizeof(LISTNODE));
node->next=list;
if (list)
list->prev=node;
list=node;
return list;
}
static void list_del(LISTNODE **list, LISTNODE *node)
{
// printf("queue_del(%s) prev=%p next=%p node=%p queue=%p\n",
// node->data,node->prev,node->next,node,queue);
if (node->prev)
node->prev->next=node->next;
else
*list=node->next;
if (node->next)
node->next->prev=node->prev;
else if (node==*list)
*list=NULL;
free(node);
}
// simplistic LIFO queue implementation for blacklist
typedef struct BLNODE BLNODE;
struct BLNODE {
LISTNODE
list;
char
*uri;
};
// simplistic LIFO queue implementation
typedef struct QNODE QNODE;
struct QNODE {
LISTNODE
list;
char
*data;
char
chunk,
inuse;
};
typedef struct {
pthread_t
tid;
QNODE
*node;
int
sockfd[2];
} DOWNLOADER;
static DOWNLOADER
downloader[NUM_DOWNLOADER];
static void
*downloader_thread(void *p);
static int
fdgetline0(char **buf, size_t *bufsize, int fd);
static QNODE
*queue;
static BLNODE
*blacklist;
static BLNODE *blacklist_add(const char *uri)
{
BLNODE *node=(BLNODE *)list_add((LISTNODE *)blacklist,calloc(1,sizeof(BLNODE)));
node->uri=strdup(uri);
blacklist=node;
return blacklist;
}
static int in_blacklist(const char *uri)
{
LISTNODE *current;
for (current=(LISTNODE *)blacklist;current;current=current->next) {
if (!strcmp(uri,((BLNODE *)current)->uri)) {
return 1;
}
}
return 0;
}
static QNODE *queue_add(const char *uri)
{
QNODE *node=(QNODE *)list_add((LISTNODE *)queue,calloc(1,sizeof(QNODE)));
node->data=strdup(uri);
queue=node;
return queue;
}
static void queue_del(QNODE *node)
{
// printf("queue_del(%s) prev=%p next=%p node=%p queue=%p\n",
// node->data,node->prev,node->next,node,queue);
xfree(node->data);
list_del((LISTNODE **)&queue,(LISTNODE *)node);
}
static QNODE *queue_get(void)
{
LISTNODE *current;
for (current=(LISTNODE *)queue;current;current=current->next) {
QNODE *node=(QNODE *)current;
if (!node->inuse) {
node->inuse=1;
return node;
}
}
return NULL;
}
int main(int argc, char **argv)
{
int n, rc, maxfd, nfds;
char *buf=NULL;
size_t bufsize=0;
ssize_t nbytes;
pthread_attr_t attr;
fd_set rset;
// create job list from arguments (arguments are URIs)
for (n=1;n<argc;n++) {
queue_add(argv[n]);
}
// start all 'downloaders' as threads
for (n=0;n<NUM_DOWNLOADER;n++) {
// create two-way communication path
socketpair(AF_UNIX,SOCK_STREAM,0,downloader[n].sockfd);
// reading & writing must not block
fcntl(downloader[n].sockfd[0],F_SETFL,O_NDELAY);
fcntl(downloader[n].sockfd[1],F_SETFL,O_NDELAY);
// init thread attributes
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
pthread_attr_setschedpolicy(&attr,SCHED_OTHER);
// start downloader thread
if ((rc=pthread_create(&downloader[n].tid, &attr, downloader_thread, &downloader[n]))!=0) {
fprintf(stderr,"Failed to start downloader, error %d\n", rc);
close(downloader[n].sockfd[0]);
close(downloader[n].sockfd[1]);
}
}
// talk with downloaders until everything is downloaded
while (queue) {
FD_ZERO(&rset);
for (maxfd=n=0;n<NUM_DOWNLOADER;n++) {
FD_SET(downloader[n].sockfd[0], &rset);
if (downloader[n].sockfd[0]>maxfd)
maxfd=downloader[n].sockfd[0];
}
// set timeout here (infinite for now)
if ((nfds=select(maxfd+1, &rset, NULL, NULL, NULL))<=0) {
// timeout or error
fprintf(stderr,"select failed, error %d\n", errno);
if (nfds==-1 && errno==EINTR) break;
continue;
}
for (n=0;n<NUM_DOWNLOADER && nfds>0;n++) {
if (FD_ISSET(downloader[n].sockfd[0],&rset)) {
while ((nbytes=fdgetline0(&buf,&bufsize,downloader[n].sockfd[0]))>0) {
// printf("- %s\n",buf);
if (!strncmp(buf,"sts ",4)) {
// new status message from downloader
// TODO: nice status display (maybe using ncurses ?)
printf("status '%s'\n",buf+4);
}
else if (!strcmp(buf,"get")) {
QNODE *node=queue_get();
if (node) {
downloader[n].node=node;
dprintf(downloader[n].sockfd[0],"set %d %s\n",node->chunk,node->data);
}
}
else if (!strncmp(buf,"add chunk ",10)) {
QNODE *node=queue_add(buf+10);
node->chunk=1;
}
else if (!strncmp(buf,"add uri ",8)) {
if (!in_blacklist(buf+8)) {
QNODE *node=queue_add(buf+8);
node->chunk=0;
}
}
else if (!strcmp(buf,"done")) {
// downloading done
if (!downloader[n].node->chunk)
blacklist_add(downloader[n].node->data);
// else // TODO: merging chunks, chechksumming
queue_del(downloader[n].node);
downloader[n].node=NULL;
}
}
nfds--;
}
}
}
// stop downloaders
for (n=0;n<NUM_DOWNLOADER;n++) {
close(downloader[n].sockfd[0]);
close(downloader[n].sockfd[1]);
// pthread_kill(downloader[n].tid,SIGTERM);
}
return EXIT_SUCCESS;
}
void *downloader_thread(void *p)
{
DOWNLOADER *downloaderp=p;
char *buf=NULL;
size_t bufsize=0;
ssize_t nbytes;
fd_set rset;
int nfds, n, chunk, pos;
unsigned int seed=(unsigned int)(time(NULL)|pthread_self());
int sockfd=downloaderp->sockfd[1];
// i saw threads starting before pthread_create() came back and before &tid was initialized.
// so, to avoid race conditions with pthread_create(), set tid here as well
downloaderp->tid=pthread_self();
dprintf(sockfd,"get\n");
for (;;) {
FD_ZERO(&rset);
FD_SET(sockfd, &rset);
// set timeout here (infinite for now)
if ((nfds=select(sockfd+1, &rset, NULL, NULL, NULL))<=0) {
// timeout or error
fprintf(stderr,"select failed, error %d\n", errno);
if (nfds==-1) break;
continue;
}
while ((nbytes=fdgetline0(&buf,&bufsize,sockfd))>0) {
// printf("+ %s\n",buf);
if (sscanf(buf,"set %d %n",&chunk,&pos)>=1) {
char *uri=buf+pos;
if (!chunk) {
dprintf(sockfd,"sts check metalink for %s...\n",uri);
// TODO: here we determine the number of chunks (METALINK RFC 5854 / 6249)
int nchunks = rand_r(&seed)%5;
if (nchunks) {
// create new jobs for chunk downloading here
dprintf(sockfd,"sts got %d chunks for %s...\n",nchunks,uri);
for (n=0;n<nchunks;n++) {
dprintf(sockfd,"add chunk %s.%d\n",uri,n);
}
dprintf(sockfd,"done\n");
dprintf(sockfd,"get\n");
continue;
}
}
// regular download (complete file)
dprintf(sockfd,"sts Downloading %s...\n",uri);
sleep(1+rand_r(&seed)%10);
dprintf(sockfd,"sts %s downloaded\n",uri);
dprintf(sockfd,"done\n");
// TODO: if running in recursive mode: parse html, css, etc. for links and
// add them to the job queue via 'add uri <uri>'
// dprintf(sockfd,"add uri %s\n",new_uri);
dprintf(sockfd,"get\n");
}
}
}
return NULL;
}
// similar to getline(), but:
// - using a file descriptor
// - returns line without trailing \n
int fdgetline0(char **buf, size_t *bufsize, int fd)
{
ssize_t nbytes=0;
size_t *sizep, *old, length=0;
char *p;
if (!buf || !bufsize)
return -1;
if (!*buf || !*bufsize) {
// first call
*buf=malloc(*bufsize=32);
sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
sizep[0]=sizep[1]=0;
} else {
sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
if (sizep[1]) {
// take care of remaining data from last call
length=sizep[1];
memmove(*buf,*buf+sizep[0],length+1);
sizep[0]=sizep[1]=0;
if ((p=memchr(*buf,'\n',length))) {
*p++=0;
sizep[0]=p-*buf; // position of extra chars
sizep[1]=length-sizep[0]; // number of extra chars
return sizep[0]-1; // length of line in *buf
}
}
}
while ((nbytes=read(fd,*buf+length,*bufsize-2*sizeof(int)-length))>0) {
length+=nbytes;
if ((p=memchr(*buf+length-nbytes,'\n',nbytes))) {
*p++=0;
sizep[0]=p-*buf; // position of extra chars
sizep[1]=length-sizep[0]; // number of extra chars
return sizep[0]-1; // length of line in *buf
}
old=sizep;
*buf=realloc(*buf,*bufsize=*bufsize*2);
sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
sizep[0]=old[0];
sizep[1]=old[1];
}
return -1;
}