I will post a thinned out part of the code in an attachment - hopefully it will
be available. If this is not too much to ask - I would be greatly grateful :)
Its purpose is to run N connections every second - the external API is
rate-limited. From what I've seen, usually my counter `(sent-received)` is
either equal or very close to `still_alive`. However, on rare occasions, my
counter maxes out (equal to `MAX_CONN`) and LibCURL reports 0 running handles.
Then it stays like that indefinitely. Temat: Re: Curl Multi with easy handle
timeoutData: 2023-04-28 17:10Nadawca: "Daniel Stenberg"
<dan...@haxx.se>Adresat: "herhor67" <herho...@interia.pl>; DW:
"herhor67 via curl-library" <curl-library@lists.haxx.se>; > On Fri, 28
Apr 2023, herhor67 wrote:
>
> > it seems that I must have some kind of logic error in my
application.
>
> Make sure you check the return code from all libcurl calls to detect
if it
> returns an error somewhere.
>
> > Does a handle with timeout have some different message/status?
>
> When a timeout triggers, the transfer stops and it returns with an
error. Then
> that transfer will be "delivered" as complete just like any other
transfer
> that completes.
>
> > There seems to only be "CURLMSG_DONE" but maybe somehing in the
`CURLcode
> > result` field?
>
> Yes, result holds a non-zero value for failures.
>
> --
>
> / daniel.haxx.se
> | Commercial curl support up to 24x7 is available!
> | Private help, bug fixes, support, ports, new features
> | https://curl.se/support.html
>
void add_url_transfer(CURLM* cm, const char* url)
{
CURL* eh = curl_easy_init();
curl_easy_setopt(eh, CURLOPT_URL, url);
curl_easy_setopt(eh, CURLOPT_NOPROGRESS, 1L);
// required in multithreaded
curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1L);
// abort if slower than 10 bytes/sec during 15 seconds
curl_easy_setopt(eh, CURLOPT_LOW_SPEED_TIME, 15L);
curl_easy_setopt(eh, CURLOPT_LOW_SPEED_LIMIT, 10L);
// full timeout if over one minute
curl_easy_setopt(eh, CURLOPT_TIMEOUT, 60L);
std::string* buffer = new std::string();
buffer->reserve(CURL_MAX_WRITE_SIZE);
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_string_callback);
curl_easy_setopt(eh, CURLOPT_WRITEDATA, buffer);
curl_easy_setopt(eh, CURLOPT_PRIVATE, buffer);
curl_multi_add_handle(cm, eh);
cout << "Added URL transfer..." << endl;
}
inline void add_url_transfer(CURLM* cm, const std::string& apiurl)
{
add_url_transfer(cm, apiurl.c_str());
}
void download_data(std::function<const std::string()> url_cb, std::function<bool(size_t, size_t, size_t, size_t, std::chrono::nanoseconds)> progress_cb)
{
size_t sent = 0;
size_t received = 0;
auto delay = std::chrono::duration_cast<std::chrono::nanoseconds>(100ms);
auto last = std::chrono::steady_clock::now() - delay;
std::queue<std::string> to_retry;
CURLM* cm = curl_multi_init();
curl_multi_setopt(cm, CURLMOPT_MAXCONNECTS, MAX_CONN);
auto add_transfer_if_allowed = [&]() -> void
{
if (sent - received >= MAX_CONN)
return;
auto now = std::chrono::steady_clock::now();
auto dur = now - last;
if (dur < delay)
return;
last = now;
if (!to_retry.empty())
{
add_url_transfer(cm, to_retry.front());
to_retry.pop();
return;
}
const std::string& url = url_cb();
if (!url.empty())
{
add_url_transfer(cm, url);
++sent;
}
};
int still_alive = 0; // running handles
do
{
CURLMcode mcode = curl_multi_perform(cm, &still_alive); // exit if error
if (mcode != CURLM_OK)
{
cerr << "MultiCURL failed, message: " << curl_multi_strerror(mcode) << endl;
throw std::runtime_error("MultiCURL had an error!");
}
while (true)
{
add_transfer_if_allowed();
int msgs_left = 0;
CURLMsg* msg = curl_multi_info_read(cm, &msgs_left);
if (msg == nullptr)
break;
//if (msg->msg != CURLMSG_DONE) // there are no other mesages defined rn
//{
// cerr << "Error: " << curl_easy_strerror(msg->data.result) << " CURLMsg (" << msg->msg << ")" << endl;
// continue;
//}
cout << "Received..." << endl;
CURL* eh = msg->easy_handle;
std::string* content_ptr;
curl_easy_getinfo(eh, CURLINFO_PRIVATE, &content_ptr); // recover content pointer from private field (passing str-ptr-ptr to get str-ptr)
bool success = false;
try
{
CURLcode code = msg->data.result;
if (code != CURLE_OK)
throw curl_error(curl_easy_strerror(code));
bool hasdata = parse_data(content_ptr);
success = true;
}
catch (const api_error& e)
{
cerr << "API error: " << e.what() << endl;
[...]
}
catch (const json_error& e)
{
cerr << "JSON error: " << e.what() << endl;
}
catch (const curl_error& e)
{
cerr << "CURL error: " << e.what() << endl;
}
delete content_ptr;
if (success)
{
++received;
}
else
{
char* url; // we dont own so we dont delete
curl_easy_getinfo(eh, CURLINFO_EFFECTIVE_URL, &url);
to_retry.push(url);
}
curl_multi_remove_handle(cm, eh);
curl_easy_cleanup(eh);
} // endwhile(true)
if (received < sent)
curl_multi_wait(cm, nullptr, 0, 100, nullptr);
}
while (progress_cb(sent, received, still_alive, delay));
cout << "MultiCURL done" << endl;
curl_multi_cleanup(cm);
cout << "MultiCURL cleanup done" << endl;
}
// example of use
void download_vector(const std::vector<PlrID>& ids)
{
size_t pos = 0;
size_t len = ids.size();
auto now_write = std::chrono::steady_clock::now();
auto last_write = std::chrono::steady_clock::now();
auto url_cb = [&]() -> const std::string
{
if (pos >= len)
return std::string{};
return get_api_url(ids.at(pos++));
};
auto progress_cb = [&](size_t sent, size_t recv, size_t alive, std::chrono::nanoseconds delay) -> bool
{
now_write = std::chrono::steady_clock::now();
if (now_write - last_write > 1s)
{
last_write = now_write;
cout << srv_to_nm(srv) << " MultiCURL main loop. Sent:" << sent << ", Recv:" << recv << ", Diff:" << (sent - recv) << ", Live:" << alive << ", Tdel:" << delay << endl;
}
return (sent < len) || (recv < sent);
};
download_data(url_cb, progress_cb);
}
--
Unsubscribe: https://lists.haxx.se/mailman/listinfo/curl-library
Etiquette: https://curl.se/mail/etiquette.html