I will post a thinned out part of the code in an attachment - hopefully it will 
be available. If this is not too much to ask - I would be greatly grateful :)  
Its purpose is to run N connections every second - the external API is 
rate-limited. From what I've seen, usually my counter `(sent-received)` is 
either equal or very close to `still_alive`. However, on rare occasions, my 
counter maxes out (equal to `MAX_CONN`) and LibCURL reports 0 running handles. 
Then it stays like that indefinitely. Temat: Re: Curl Multi with easy handle 
timeoutData: 2023-04-28 17:10Nadawca: "Daniel Stenberg" 
<dan...@haxx.se>Adresat: "herhor67" <herho...@interia.pl>; DW: 
"herhor67 via curl-library" <curl-library@lists.haxx.se>; > On Fri, 28 
Apr 2023, herhor67 wrote:
> 
> > it seems that I must have some kind of logic error in my
application.
> 
> Make sure you check the return code from all libcurl calls to detect
if it 
> returns an error somewhere.
> 
> > Does a handle with timeout have some different message/status?
> 
> When a timeout triggers, the transfer stops and it returns with an
error. Then 
> that transfer will be "delivered" as complete just like any other
transfer 
> that completes.
> 
> > There seems to only be "CURLMSG_DONE" but maybe somehing in the
`CURLcode 
> > result` field?
> 
> Yes, result holds a non-zero value for failures.
> 
> -- 
> 
>   / daniel.haxx.se
>   | Commercial curl support up to 24x7 is available!
>   | Private help, bug fixes, support, ports, new features
>   | https://curl.se/support.html
> 
void add_url_transfer(CURLM* cm, const char* url)
{
	CURL* eh = curl_easy_init();

	curl_easy_setopt(eh, CURLOPT_URL, url);

	curl_easy_setopt(eh, CURLOPT_NOPROGRESS, 1L);

	// required in multithreaded
	curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1L);

	// abort if slower than 10 bytes/sec during 15 seconds
	curl_easy_setopt(eh, CURLOPT_LOW_SPEED_TIME, 15L);
	curl_easy_setopt(eh, CURLOPT_LOW_SPEED_LIMIT, 10L);

	// full timeout if over one minute
	curl_easy_setopt(eh, CURLOPT_TIMEOUT, 60L);

	std::string* buffer = new std::string();
	buffer->reserve(CURL_MAX_WRITE_SIZE);

	curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_string_callback);
	curl_easy_setopt(eh, CURLOPT_WRITEDATA, buffer);
	curl_easy_setopt(eh, CURLOPT_PRIVATE, buffer);

	curl_multi_add_handle(cm, eh);

	cout << "Added URL transfer..." << endl;
}

inline void add_url_transfer(CURLM* cm, const std::string& apiurl)
{
	add_url_transfer(cm, apiurl.c_str());
}


void download_data(std::function<const std::string()> url_cb, std::function<bool(size_t, size_t, size_t, size_t, std::chrono::nanoseconds)> progress_cb)
{
	size_t sent = 0;
	size_t received = 0;

	auto delay = std::chrono::duration_cast<std::chrono::nanoseconds>(100ms);
	auto last = std::chrono::steady_clock::now() - delay;
	std::queue<std::string> to_retry;

	CURLM* cm = curl_multi_init();
	
	curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, MAX_CONN);

	auto add_transfer_if_allowed = [&]() -> void
	{
		if (sent - received >= MAX_CONN)
			return;

		auto now = std::chrono::steady_clock::now();
		auto dur = now - last;
		if (dur < delay)
			return;

		last = now;

		if (!to_retry.empty())
		{
			add_url_transfer(cm, to_retry.front());
			to_retry.pop();
			return;
		}

		const std::string& url = url_cb();
		if (!url.empty())
		{
			add_url_transfer(cm, url);
			++sent;
		}
	};

	int still_alive = 0; // running handles
	do
	{
		CURLMcode mcode = curl_multi_perform(cm, &still_alive); // exit if error

		if (mcode != CURLM_OK)
		{
			cerr << "MultiCURL failed, message: " << curl_multi_strerror(mcode) << endl;
			throw std::runtime_error("MultiCURL had an error!");
		}

		while (true)
		{
			add_transfer_if_allowed();

			int msgs_left = 0;
			CURLMsg* msg = curl_multi_info_read(cm, &msgs_left);

			if (msg == nullptr)
				break;

			//if (msg->msg != CURLMSG_DONE) // there are no other mesages defined rn
			//{
			//	cerr << "Error: " << curl_easy_strerror(msg->data.result) << " CURLMsg (" << msg->msg << ")" << endl;
			//	continue;
			//}

			cout << "Received..." << endl;
			
			
			CURL* eh = msg->easy_handle;

			std::string* content_ptr;
			curl_easy_getinfo(eh, CURLINFO_PRIVATE, &content_ptr); // recover content pointer from private field (passing str-ptr-ptr to get str-ptr)

			bool success = false;
			try
			{
				CURLcode code = msg->data.result;

				if (code != CURLE_OK)
					throw curl_error(curl_easy_strerror(code));

				bool hasdata = parse_data(content_ptr);
				success = true;
			}
			catch (const api_error& e)
			{
				cerr << "API error: " << e.what() << endl;
				[...]
			}
			catch (const json_error& e)
			{
				cerr << "JSON error: " << e.what() << endl;
			}
			catch (const curl_error& e)
			{
				cerr << "CURL error: " << e.what() << endl;
			}

			delete content_ptr;

			if (success)
			{
				++received;
			}
			else
			{
				char* url; // we dont own so we dont delete
				curl_easy_getinfo(eh, CURLINFO_EFFECTIVE_URL, &url);
				to_retry.push(url);
			}

			curl_multi_remove_handle(cm, eh);
			curl_easy_cleanup(eh);

		} // endwhile(true)

		if (received < sent)
			curl_multi_wait(cm, nullptr, 0, 100, nullptr);

	}
	while (progress_cb(sent, received, still_alive, delay));

	cout << "MultiCURL done" << endl;
	curl_multi_cleanup(cm);
	cout << "MultiCURL cleanup done" << endl;
}


// example of use
void download_vector(const std::vector<PlrID>& ids)
{
	size_t pos = 0;
	size_t len = ids.size();

	auto now_write = std::chrono::steady_clock::now();
	auto last_write = std::chrono::steady_clock::now();

	auto url_cb = [&]() -> const std::string
	{
		if (pos >= len)
			return std::string{};

		return get_api_url(ids.at(pos++));
	};

	auto progress_cb = [&](size_t sent, size_t recv, size_t alive, std::chrono::nanoseconds delay) -> bool
	{
		now_write = std::chrono::steady_clock::now();
		if (now_write - last_write > 1s)
		{
			last_write = now_write;
			cout << srv_to_nm(srv) << " MultiCURL main loop. Sent:" << sent << ", Recv:" << recv << ", Diff:" << (sent - recv) << ", Live:" << alive << ", Tdel:" << delay << endl;
		}

		return (sent < len) || (recv < sent);
	};

	download_data(url_cb, progress_cb);
}
-- 
Unsubscribe: https://lists.haxx.se/mailman/listinfo/curl-library
Etiquette:   https://curl.se/mail/etiquette.html

Reply via email to