Am Tuesday 10 April 2012 schrieb Micah Cowan:
> > Well, since there is no response to my previous post: is there any
> > interest in getting that done anyway ?
> 
> There's interest, sure enough. But this concurrency stuff was meant to
> be a Google Summer of Code project, so someone already getting started
> on (and completing a proof-of-concept for) these things leaves us in a
> bit of a weird place with regard to the current Summer of Code
> applicants we're sifting through.

Ups. Didn't think about that.

> But perhaps you can post what you've done so far, and we can take a look
> at what there is, and what remains, and whether a summer-of-code student
> could adapt their needed work to fill in the gaps...

I stripped off some already begun stuff and added some comments. So now the 
code might be used as a kind of workbench for testing concurrency and/or 
metalink stuff.

The code is written from scratch.
I put it under CC0 License (GNU compatible), so everybody can do what 
she/he/it wants with it.

> As to HTTP header stuff... I had a hunch before that no one is using it
> much in practice, especially since it's a newer spec. But I'd imagine
> metalinker.org might; or if not, someone there could probably point you
> at a test server somewhere, or something.

Let the students care about that ;-)

If I can help out with something regarding the project, drop me a mail.

     Tim
/*
 * (c)2012 Tim Ruehsen
 *
 * Source Code License
 *   CC0 1.0 Universal (CC0 1.0) Public Domain Dedication
 *   http://creativecommons.org/publicdomain/zero/1.0/legalcode
 *
 * proof of concept for a threaded concurrent download design
 *
 * Changelog
 * 06.04.2012  Tim Ruehsen  created
 *
 * Hints:
 * The code is complete written from scratch and can be used as
 * a starting point.
 * Downloads are just 'simulated' by a random sleep.
 * Job queue and blacklist are implemented as simple as possible (using lists).
 * There is no metalink code, no network code, no html parsing, no IRI/URI parsing.
 * All that should be done as a Google Summer of Code project.
 *
 * Network, parsing, URI code is available within the wget code.
 * Metalink code has to be written (see RFC 5854 / 6249)
 *
 * Once everything is implemented and working, the code should be
 * integrated into wget.
 *
 * Compile and link with
 *   gcc -Wall -Wextra -O2 wget_proto.c -o wget_proto -lpthread
 *
 */

#include <pthread.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <errno.h>
#include <ctype.h>

#include <sys/types.h>
#include <sys/socket.h>

#define NUM_DOWNLOADER 3

#define xfree(a) do { free((void *)(a)); a=NULL; } while (0);

typedef struct LISTNODE LISTNODE;
struct LISTNODE {
	LISTNODE
		*next,
		*prev;
};

static LISTNODE *list_add(LISTNODE *list, LISTNODE *node)
{
	if (!node)
		node=malloc(sizeof(LISTNODE));

	node->next=list;
	if (list)
		list->prev=node;
	list=node;

	return list;
}

static void list_del(LISTNODE **list, LISTNODE *node)
{
//	printf("queue_del(%s) prev=%p next=%p node=%p queue=%p\n",
//		node->data,node->prev,node->next,node,queue);

	if (node->prev)
		node->prev->next=node->next;
	else
		*list=node->next;

	if (node->next)
		node->next->prev=node->prev;
	else if (node==*list)
		*list=NULL;

	free(node);
}

// simplistic LIFO queue implementation for blacklist
typedef struct BLNODE BLNODE;
struct BLNODE {
	LISTNODE
		list;
	char
		*uri;
};

// simplistic LIFO queue implementation
typedef struct QNODE QNODE;
struct QNODE {
	LISTNODE
		list;
	char
		*data;
	char
		chunk,
		inuse;
};

typedef struct {
	pthread_t
		tid;
	QNODE
		*node;
	int
		sockfd[2];
} DOWNLOADER;

static DOWNLOADER
	downloader[NUM_DOWNLOADER];

static void
	*downloader_thread(void *p);
static int
	fdgetline0(char **buf, size_t *bufsize, int fd);


static QNODE
	*queue;
static BLNODE
	*blacklist;

static BLNODE *blacklist_add(const char *uri)
{
	BLNODE *node=(BLNODE *)list_add((LISTNODE *)blacklist,calloc(1,sizeof(BLNODE)));

	node->uri=strdup(uri);
	blacklist=node;

	return blacklist;
}

static int in_blacklist(const char *uri)
{
	LISTNODE *current;

	for (current=(LISTNODE *)blacklist;current;current=current->next) {
		if (!strcmp(uri,((BLNODE *)current)->uri)) {
			return 1;
		}
	}

	return 0;
}

static QNODE *queue_add(const char *uri)
{
	QNODE *node=(QNODE *)list_add((LISTNODE *)queue,calloc(1,sizeof(QNODE)));

	node->data=strdup(uri);
	queue=node;

	return queue;
}

static void queue_del(QNODE *node)
{
//	printf("queue_del(%s) prev=%p next=%p node=%p queue=%p\n",
//		node->data,node->prev,node->next,node,queue);

	xfree(node->data);

	list_del((LISTNODE **)&queue,(LISTNODE *)node);
}

static QNODE *queue_get(void)
{
	LISTNODE *current;

	for (current=(LISTNODE *)queue;current;current=current->next) {
		QNODE *node=(QNODE *)current;
		if (!node->inuse) {
			node->inuse=1;
			return node;
		}
	}

	return NULL;
}


int main(int argc, char **argv)
{
	int n, rc, maxfd, nfds;
	char *buf=NULL;
	size_t bufsize=0;
	ssize_t nbytes;
	pthread_attr_t	attr;
	fd_set rset;

	// create job list from arguments (arguments are URIs)
	for (n=1;n<argc;n++) {
		queue_add(argv[n]);
	}

	// start all 'downloaders' as threads
	for (n=0;n<NUM_DOWNLOADER;n++) {
		// create two-way communication path
		socketpair(AF_UNIX,SOCK_STREAM,0,downloader[n].sockfd);

		// reading & writing must not block
		fcntl(downloader[n].sockfd[0],F_SETFL,O_NDELAY);
		fcntl(downloader[n].sockfd[1],F_SETFL,O_NDELAY);

		// init thread attributes
		pthread_attr_init(&attr);
		pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED);
		pthread_attr_setschedpolicy(&attr,SCHED_OTHER);

		// start downloader thread
		if ((rc=pthread_create(&downloader[n].tid, &attr, downloader_thread, &downloader[n]))!=0) {
			fprintf(stderr,"Failed to start downloader, error %d\n", rc);
			close(downloader[n].sockfd[0]);
			close(downloader[n].sockfd[1]);
		}
	}

	// talk with downloaders until everything is downloaded
	while (queue) {
		FD_ZERO(&rset);
		for (maxfd=n=0;n<NUM_DOWNLOADER;n++) {
			FD_SET(downloader[n].sockfd[0], &rset);
			if (downloader[n].sockfd[0]>maxfd)
				maxfd=downloader[n].sockfd[0];
		}

		// set timeout here (infinite for now)
		if ((nfds=select(maxfd+1, &rset, NULL, NULL, NULL))<=0) {
			// timeout or error
			fprintf(stderr,"select failed, error %d\n", errno);
			if (nfds==-1 && errno==EINTR) break;
			continue;
		}

		for (n=0;n<NUM_DOWNLOADER && nfds>0;n++) {
			if (FD_ISSET(downloader[n].sockfd[0],&rset)) {
				while ((nbytes=fdgetline0(&buf,&bufsize,downloader[n].sockfd[0]))>0) {
					// printf("- %s\n",buf);
					if (!strncmp(buf,"sts ",4)) {
						// new status message from downloader
						// TODO: nice status display (maybe using ncurses ?)
						printf("status '%s'\n",buf+4);
					}
					else if (!strcmp(buf,"get")) {
						QNODE *node=queue_get();
						if (node) {
							downloader[n].node=node;
							dprintf(downloader[n].sockfd[0],"set %d %s\n",node->chunk,node->data);
						}
					}
					else if (!strncmp(buf,"add chunk ",10)) {
						QNODE *node=queue_add(buf+10);
						node->chunk=1;
					}
					else if (!strncmp(buf,"add uri ",8)) {
						if (!in_blacklist(buf+8)) {
							QNODE *node=queue_add(buf+8);
							node->chunk=0;
						}
					}
					else if (!strcmp(buf,"done")) {
						// downloading done
						if (!downloader[n].node->chunk)
							blacklist_add(downloader[n].node->data);
						// else // TODO: merging chunks, chechksumming

						queue_del(downloader[n].node);
						downloader[n].node=NULL;
					}
				}
				nfds--;
			}
		}
	}

	// stop downloaders
	for (n=0;n<NUM_DOWNLOADER;n++) {
		close(downloader[n].sockfd[0]);
		close(downloader[n].sockfd[1]);
		// pthread_kill(downloader[n].tid,SIGTERM);
	}

	return EXIT_SUCCESS;
}

void *downloader_thread(void *p)
{
	DOWNLOADER *downloaderp=p;
	char *buf=NULL;
	size_t bufsize=0;
	ssize_t nbytes;
	fd_set rset;
	int nfds, n, chunk, pos;
	unsigned int seed=(unsigned int)(time(NULL)|pthread_self());
	int sockfd=downloaderp->sockfd[1];

	// i saw threads starting before pthread_create() came back and before &tid was initialized.
	// so, to avoid race conditions with pthread_create(), set tid here as well
	downloaderp->tid=pthread_self();

	dprintf(sockfd,"get\n");

	for (;;) {
		FD_ZERO(&rset);
		FD_SET(sockfd, &rset);

		// set timeout here (infinite for now)
		if ((nfds=select(sockfd+1, &rset, NULL, NULL, NULL))<=0) {
			// timeout or error
			fprintf(stderr,"select failed, error %d\n", errno);
			if (nfds==-1) break;
			continue;
		}

		while ((nbytes=fdgetline0(&buf,&bufsize,sockfd))>0) {
			// printf("+ %s\n",buf);
			if (sscanf(buf,"set %d %n",&chunk,&pos)>=1) {
				char *uri=buf+pos;

				if (!chunk) {
					dprintf(sockfd,"sts check metalink for %s...\n",uri);

					// TODO: here we determine the number of chunks (METALINK RFC 5854 / 6249)
					int nchunks = rand_r(&seed)%5;

					if (nchunks) {
						// create new jobs for chunk downloading here
						dprintf(sockfd,"sts got %d chunks for %s...\n",nchunks,uri);
						for (n=0;n<nchunks;n++) {
							dprintf(sockfd,"add chunk %s.%d\n",uri,n);
						}

						dprintf(sockfd,"done\n");
						dprintf(sockfd,"get\n");
						continue;
					}
				}

				// regular download (complete file)
				dprintf(sockfd,"sts Downloading %s...\n",uri);
 				sleep(1+rand_r(&seed)%10);
				dprintf(sockfd,"sts %s downloaded\n",uri);

				dprintf(sockfd,"done\n");

				// TODO: if running in recursive mode: parse html, css, etc. for links and
				// add them to the job queue via 'add uri <uri>'
				// dprintf(sockfd,"add uri %s\n",new_uri);

				dprintf(sockfd,"get\n");
			}
		}
	}

	return NULL;
}

// similar to getline(), but:
// - using a file descriptor
// - returns line without trailing \n
int fdgetline0(char **buf, size_t *bufsize, int fd)
{
	ssize_t nbytes=0;
	size_t *sizep, *old, length=0;
	char *p;

	if (!buf || !bufsize)
		return -1;

	if (!*buf || !*bufsize) {
		// first call
		*buf=malloc(*bufsize=32);
		sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
		sizep[0]=sizep[1]=0;
	} else {
		sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
		if (sizep[1]) {
			// take care of remaining data from last call
			length=sizep[1];
			memmove(*buf,*buf+sizep[0],length+1);
			sizep[0]=sizep[1]=0;

			if ((p=memchr(*buf,'\n',length))) {
				*p++=0;
				sizep[0]=p-*buf; // position of extra chars
				sizep[1]=length-sizep[0]; // number of extra chars
				return sizep[0]-1; // length of line in *buf
			}
		}
	}

	while ((nbytes=read(fd,*buf+length,*bufsize-2*sizeof(int)-length))>0) {
		length+=nbytes;
		if ((p=memchr(*buf+length-nbytes,'\n',nbytes))) {
			*p++=0;
			sizep[0]=p-*buf; // position of extra chars
			sizep[1]=length-sizep[0]; // number of extra chars
			return sizep[0]-1; // length of line in *buf
		}

		old=sizep;
		*buf=realloc(*buf,*bufsize=*bufsize*2);
		sizep=(size_t *)(*buf+*bufsize-2*sizeof(size_t));
		sizep[0]=old[0];
		sizep[1]=old[1];
	}

	return -1;
}

Reply via email to