Hi,

I would like to apologise first that due to my oversight I posted the same
message on curl-users mailing list whereas I should have posted it here.
Here is the message :-


I am using libcurl's multi interface(library version *7.57*) along with
epoll for getting data in parallel from n http2 servers(assume that n = 2).
I want to give a total of say 200 ms to receive all the responses(from n
http2 servers), after which I process the responses. Find attached a file
that contains the code that uses multi_socket_action API with epoll.
Ouput of curl_version()* libcurl/7.57.0 OpenSSL/1.0.2g zlib/1.2.8
c-ares/1.10.0 nghttp2/1.29.0*

Here is how I use it :-
1) Create a multi handle and set options on it.
2) Create n easy handles
3) Set options on the easy handles
4) Add the n easy handles in the multi handle
5) Call multi_socket_action with CURL_SOCKET_TIMEOUT to start everything. I
wait a maximum of 200ms for the entire process using epoll.
6) Call curl_multi_remove_handle on each of the n easy handles.
7) *Goto Step 3*.

If all responses received within 200ms, I process them, else(atleast one of
them could not complete) we process the ones we received, i.e 200 ms is the
maximum time I want my application to wait to get response from a
designated server, after which I continue to work with responses I received.

Now the problem is that whenever a server is unable to send a response in
200ms(lets call this event a timeout), libcurl tearsdown the underlying TCP
connection established with this server, which adds the overhead of
connection establishment(which increases a lot considering that I am using
https) for a subsequent transaction with the same server.

Technically, when the *curl_multi_remove_handle* function is called for the
easy_handle the connection is in not in the state *CURLM_STATE_COMPLETED* and
is marked "*premature*"(premature = (data->mstate < CURLM_STATE_COMPLETED)
? TRUE : FALSE;) and this premature connection is then closed in the
function *multi_done*.

Here is the comment before calling *Curl_disconnect* on the connection in
the function *multi_done*
/*
     if premature is TRUE, it means this connection was said to be DONE
before
     the entire request operation is complete and thus we can't know in what
     state it is for re-using, so we're forced to close it. In a perfect
world
     we can add code that keep track of if we really must close it here or
not,
     but currently we have no such detail knowledge.
*/

I can understand why connection teardown is needed in HTTP/1.1, but why
can't this behaviour be done away in HTTP/2 considering that HTTP/2
supports multiple streams(multiplexed) over the same connection; so
*theoretically* I can just "*discard*" the old stream(which had timed out
from my application's perspective and could contain old/invalid data) and
start a new stream for subsequent transactions over the same ESTABLISHED
connection instead of tearing down the TCP connection and bearing the
overhead of connection establishment again(which is certainly prohibitive
in my case) for a subsequent transaction with the same server. Is this
change technically difficult or not supported by the protocol? Request you
to elaborate the reasoning as I am not a networking expert. Please feel
free to correct me.

Regards
kartik
#include <string.h>
#include <assert.h>
#include <curl/curl.h>
#include <errno.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>

#define MAX_URLS 2
#define MAX_EVENTS 1000
#define MAX_THREADS 300

const int conn_timeout = 100;


static int min(int a, int b) {
	return a<b?a:b;
}

struct thread_params {
	int th_id; // thread id
	int epoll_fd; // epoll fd
	int waiting_time; // timer time for next timeout
	int timeouts; // collect number of timeouts
	int iteration;
	int bytes_received;
	int time_left;// Total time out of 100 ms which is left
};

const char* urls[MAX_URLS] = {
	"https://172.16.4.8:8080/bidder?wt=40";,
	"https://172.16.4.9:8080/bidder?wt=59";
};

int socket_callback(CURL* easy, curl_socket_t fd, int what, void* u, void* s)
{
	(void) easy;
	(void) s;
	struct thread_params* params = (struct thread_params*)u;
	int epollFd = params->epoll_fd;
	struct epoll_event event;
	event.events = 0;
	event.data.fd = fd;
	if (what == CURL_POLL_REMOVE) {
		int res = epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, &event);
		(void) res;
		return 0;
	}
	if (what == CURL_POLL_IN || what == CURL_POLL_INOUT) {
		event.events |= EPOLLIN;
	}
	if (what == CURL_POLL_OUT || what == CURL_POLL_INOUT) {
		event.events |= EPOLLOUT;
	}
	if (event.events != 0) {
		int res = epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &event);
		if (res == -1)
			res = epoll_ctl(epollFd, EPOLL_CTL_MOD, fd, &event);
		if (res == -1)
			fprintf(stderr, "ERROR epoll_ctl(MOD)\n");
	} else {
		assert(0);
	}
	return 0;
}

int timer_callback(CURLM* multi, long timeout_ms, void* u)
{
	if (timeout_ms < 0) {
		return 0;
	}
	struct thread_params* params = (struct thread_params*)u;
	int retval;
	int running_handles;
	if (timeout_ms == 0) {
		retval = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &running_handles);
		if ( retval != CURLM_OK) {
			fprintf(stderr, "ERROR curl_multi_socket_action() return error = %d %s:%d\n", retval,__FILE__, __LINE__);
			assert(0);
		}
		params->waiting_time = 0; // reset this
	} else {
		params->waiting_time = timeout_ms;
	}
	return 0;
}

int response_writer(void *ptr, size_t size, size_t nmemb, void* userdata) {
	(void) ptr;
	int len = size * nmemb;
	struct thread_params* params = (struct thread_params*)userdata;
	params->bytes_received +=len;
	// Don't worry about the response right now
	return len;	
}

int set_curl_common_option(CURL *easy, void* resp_buff, int req_timeout) {
	(void) req_timeout;
	CURLcode rc_easy;
	rc_easy = curl_easy_setopt(easy, CURLOPT_HTTPGET, 1L);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	//
	rc_easy = curl_easy_setopt(easy, CURLOPT_HTTP_VERSION, CURL_HTTP_VERSION_2TLS);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_SSL_VERIFYPEER, 0L);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_SSL_VERIFYHOST, 0L);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_PIPEWAIT, 1L);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_WRITEFUNCTION, response_writer);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_WRITEDATA, (void*)resp_buff);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_CONNECTTIMEOUT_MS, conn_timeout);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_NOSIGNAL, 1);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_HEADERFUNCTION, NULL);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
	rc_easy = curl_easy_setopt(easy, CURLOPT_HEADERDATA, NULL);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
#ifdef DEBUG
	rc_easy = curl_easy_setopt(easy, CURLOPT_VERBOSE, 1);
	if (rc_easy != CURLE_OK) {
		fprintf(stderr,"\nERROR curl_easy_setopt() failed : %s %s:%d\n", curl_easy_strerror(rc_easy), __FILE__, __LINE__);
	}
#endif

	return rc_easy;
}

int execute_epoll(CURLM* multi, struct thread_params* params) {
	int th_id = params->th_id;
	int timeout_counter = 0;
	CURLMcode retval;

	// fprintf(stderr, "INFO: running epoll() implementation\n");
	int running_handles = 100;
	int it = 0;
	int event_count;
	struct timespec t0;
	struct timespec t1;
	long long t0_ms;
	long long t1_ms;
	long long t_diff;
	
	retval = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &running_handles);
	if ( retval != CURLM_OK) {
		fprintf(stderr, "%d:ERROR curl_multi_socket_action() return error = %d %s:%d\n", th_id, retval,__FILE__, __LINE__);
		return 1;
	}
	fprintf(stderr, "# of running_handles = %d\n", running_handles);

	int ep_wait_time;
	while (running_handles > 0 &&  params->time_left > 0) {
		struct epoll_event events[MAX_EVENTS];
		clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
		ep_wait_time = min(params->waiting_time, params->time_left);
		event_count = epoll_wait(params->epoll_fd, events, MAX_EVENTS, ep_wait_time);
		clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
		t0_ms = (t0.tv_sec * 1000 + (t0.tv_nsec / 1000000));
		t1_ms = (t1.tv_sec * 1000 + (t1.tv_nsec / 1000000));
		t_diff = t1_ms - t0_ms;

		if (event_count == -1) {
			fprintf(stderr, "%d:ERROR epoll_wait:%d|%s\n", th_id, errno, strerror(errno));
			assert(0);
		}
		else if (event_count == 0) {
			params->time_left -= ep_wait_time;
			// fprintf(stderr, "%d: epoll_wait: running_handles:%d event_counts:%d | time spend:%lld | params->time_left:%d\n", th_id, running_handles, event_count, ep_wait_time, params->time_left);
			retval = curl_multi_socket_action(multi, CURL_SOCKET_TIMEOUT, 0, &running_handles);
			if ( retval != CURLM_OK) {
				fprintf(stderr, "%d:ERROR curl_multi_socket_action() return error = %d %s:%d\n", th_id, retval,__FILE__, __LINE__);
assert(0);
				break;
			}
		}
		else {
			for(it = 0; it < event_count; it++) {
				int ev_bitmask = 0;
				switch (events[it].events) {
					case EPOLLIN:
						ev_bitmask |= CURL_CSELECT_IN;
						break;
					case EPOLLOUT:
						ev_bitmask |= CURL_CSELECT_OUT;
						break;
					case EPOLLERR:
						ev_bitmask |= CURL_CSELECT_ERR;
						break;
					default:
						// More than one I/O event is detected on this fd, let curl figure out on its own what set of events has happened
						ev_bitmask = 0;
				} 
				retval = curl_multi_socket_action(multi, events[it].data.fd, ev_bitmask, &running_handles);
				if ( retval != CURLM_OK) {
					fprintf(stderr, "%d:ERROR curl_multi_socket_action() return error = %d %s:%d\n", th_id, retval,__FILE__, __LINE__);
					assert(0);
					break;
				}
			}
			params->time_left -= t_diff; 
		}

	}
	if (running_handles > 0) {
		timeout_counter++;
	}
	return timeout_counter;
}

void* do_task(void* ptr) {
	struct thread_params* params = (struct thread_params*) ptr;
	int it, idx;
	CURLMcode retval = CURLM_OK;
	CURL* easy[MAX_URLS];
	CURLM* multi = curl_multi_init();
	
	retval = curl_multi_setopt(multi, CURLMOPT_MAXCONNECTS, 100);
	if ( retval != CURLM_OK) {
		fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
	}
	retval = curl_multi_setopt(multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
	if ( retval != CURLM_OK) {
		fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
	}

	int req_timeout = params->time_left;

	for(idx = 0; idx < MAX_URLS; idx++) {
		easy[idx] = curl_easy_init();
	}

	for(it = 0; it < params->iteration; it++) {
		params->time_left = req_timeout;
		/* SET socket callback function */
		retval = curl_multi_setopt(multi, CURLMOPT_SOCKETFUNCTION, socket_callback);
		if ( retval != CURLM_OK) {
			fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
			return NULL;
		}

		/* SET socket callback function parameters*/
		retval = curl_multi_setopt(multi, CURLMOPT_SOCKETDATA, ptr);
		if ( retval != CURLM_OK) {
			fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
			return NULL;
		}

		/* SET timmer function */	
		retval = curl_multi_setopt(multi, CURLMOPT_TIMERFUNCTION, timer_callback);
		if ( retval != CURLM_OK) {
			fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
			return NULL;
		}

		/* SET timmer function parameters */	
		retval = curl_multi_setopt(multi, CURLMOPT_TIMERDATA, ptr);
		if ( retval != CURLM_OK) {
			fprintf(stderr,"\nERROR curl_multi_setopt() failed : %s %s:%d\n", curl_multi_strerror(retval), __FILE__, __LINE__);
			return NULL;
		}
		
		for(idx = 0; idx < MAX_URLS; idx++) {
			int url_idx = idx % MAX_URLS;
			CURLcode rv = curl_easy_setopt(easy[idx], CURLOPT_URL, urls[url_idx]);
			if ( rv != CURLE_OK) {
				fprintf(stderr, "\nERROR curl_multi_perform() return error = %d %s:%d\n", retval,__FILE__, __LINE__);
				return NULL;
			}

			set_curl_common_option(easy[idx], params, req_timeout);
			retval = curl_multi_add_handle(multi, easy[idx]);
			if ( retval != CURLM_OK) {
				fprintf(stderr, "\nERROR curl_multi_perform() return error = %d %s:%d\n", retval,__FILE__, __LINE__);
				return NULL;
			}
		}

		params->timeouts += execute_epoll(multi, params);
		
		for(idx = 0; idx < MAX_URLS; idx++) {
			retval = curl_multi_remove_handle(multi, easy[idx]);
			if (retval != CURLM_OK) {
				fprintf(stderr, "\nERROR curl_multi_remove_handle RTB failed with rc = %d %s:%d\n", retval, __FILE__, __LINE__);
				return NULL;
			}
		}
	}

	curl_multi_cleanup(multi);
	return NULL;
}

/* 
 * 1. argv[1] number of threads needs to be created 
 * 2. argv[2] number of iteration per thread
 * 3. argv[3] http request timeout in milli-seconds
 */
int main(int argc, char** argv)
{
	if(4 != argc) {
		fprintf(stderr, "\nUsage: thread_count thread_iteration, req_timeout_ms\n");
		return 1;
	}

	struct timespec t0;
	struct timespec t1;
	double t0_ms;
	double t1_ms;
	double t_diff;
	int idx = 0;
	int ret = 0;
	struct thread_params* params = NULL;

	int nthreads = atoi(argv[1]);
	int iteration = atoi(argv[2]);
	int req_timeout = atoi(argv[3]);

	/*
		 if (run_epoll) {
		 ProfilerStart("/tmp/epoll.prof");
		 }else{
		 ProfilerStart("/tmp/select.prof");
		 }
		 */

	curl_global_init(CURL_GLOBAL_ALL);
	printf("Using %s\n", curl_version());

	params = (struct thread_params *) malloc(sizeof(struct thread_params) * nthreads);
	if (NULL == params) {
		fprintf(stderr, "\nERROR malloc() failed %s:%d\n", __FILE__, __LINE__);
		return 1;
	}

	clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
	// Thread start
	pthread_t tid[MAX_THREADS];
	for (idx = 0; idx < nthreads; idx++) {
		params[idx].th_id = idx;
		params[idx].time_left = req_timeout;
		params[idx].timeouts = 0;
		params[idx].iteration = iteration;
		params[idx].bytes_received = 0;
		// create epoll_fd for thread;
		params[idx].epoll_fd = epoll_create(50);
		if (params[idx].epoll_fd == -1) {
			fprintf(stderr, "ERROR epoll_create");
			return 1;
		}

		ret = pthread_create(&tid[idx], NULL, do_task, (void*) (&params[idx]));
		if (ret != 0) {
			fprintf(stderr, "\nERROR pthread_create() ret = %d %s:%d\n", ret, __FILE__, __LINE__);
			return 1;
		}
	}

	// Thread wait
	for (idx = 0; idx < nthreads; idx++) {
		ret = pthread_join(tid[idx], NULL);
		if (ret != 0) {
			fprintf(stderr, "\nERROR pthread_join() ret = %d %s:%d\n", ret, __FILE__, __LINE__);
			return 1;
		}
	}


	curl_global_cleanup();
	// cleanup all epoll_fds
	for(idx = 0; idx < nthreads; idx++) {
		close(params[idx].epoll_fd);
	}
	printf(">>> bye bye\n");

	clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
	t0_ms = (t0.tv_sec * 1000.0 + (t0.tv_nsec / 1000000.0));
	t1_ms = (t1.tv_sec * 1000.0 + (t1.tv_nsec / 1000000.0));
	t_diff = t1_ms - t0_ms;
	curl_global_cleanup();

	int total_timeouts = 0;
	int total_bytes_received = 0;
	for(idx = 0; idx < nthreads; idx++) {
		total_timeouts += params[idx].timeouts;
		total_bytes_received  += params[idx].bytes_received;

	}

	fprintf(stderr, "\nMain thread returning after time %lf ms and total timeouts = %d, data_received = %d bytes\n", t_diff, total_timeouts, total_bytes_received);
	return 0;
}
-------------------------------------------------------------------
Unsubscribe: https://cool.haxx.se/list/listinfo/curl-library
Etiquette:   https://curl.haxx.se/mail/etiquette.html

Reply via email to