I encountered this problem too. Chris recommended using a steady_timer to construct an async semaphore. I wasn't happy with that, so wrote an async_semaphore type.
Klemens then picked up on that and refined it. https://github.com/klemens-morgenstern/sam On Tue, 23 May 2023 at 00:44, Suraaj K S via Boost-users < boost-users@lists.boost.org> wrote: > Hi all, > > In boost-asio, I realized that there was no easy way to have something > that resembles a condition variable. > > However, I realized that I could get something very similar using stackful > coroutines ( > https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spawn.html), > while using an io_context as a 'queue' for completion tokens. Below is my > approach: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *#include "asio_types.hpp"#include <atomic>#include <iostream>class > async_pending_queue {public: async_pending_queue() : > pending_handlers_(0), strand_(pending_queue_), > wg_(asio::make_work_guard(pending_queue_)) {} template <typename > CompletionToken> auto async_submit( CompletionToken &&token, > std::function<void(void)> atomic_action = [] {}) { auto init = [this, > &atomic_action](auto completion_handler) { auto posted_lambda = > [handler = std::move(completion_handler), > this]() mutable { pending_handlers_--; asio_sys_err ec; > handler(ec); }; post(strand_,std::move(posted_lambda)); > pending_handlers_++; atomic_action(); }; return > asio::async_initiate<CompletionToken, void(asio_sys_err)>(init, > token); } int > pending_count() { return pending_handlers_.load(); } // It may not run 1 > and run 0 bool try_run_one() { auto cnt = pending_queue_.poll_one(); > std::cout << "completion token result" << cnt << std::endl; bool ret = > (cnt == 1); return ret; }private: std::atomic<unsigned int> > pending_handlers_; asio_ioctx pending_queue_; asio_ioctx::strand > strand_; decltype(asio::make_work_guard(pending_queue_)) wg_;};* > Here, one simply uses calls `my_async_pending_queue.async_submit(yield)`, > if calling from a stackful coroutine. The coroutine can be continued by > calling `my_async_pending_queue.try_run_one()`. > > Using this, I wanted to build a 'memory checker'. It has two functions -> > `request_space` and `free_space`. A coroutine calls `request_space`, which > may block if there is no space left. Meanwhile, another thread / coroutine > can call `free_space`, which will run blocked coroutines if possible. > > I built a toy memory checker wrapper as follows: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *#ifndef MEM_CHECK_HPP#define MEM_CHECK_HPP#include <cstddef>#include > <mutex>#include <queue>#include <boost/asio.hpp>#include > <boost/asio/spawn.hpp>namespace asio = boost::asio;using asio_ioctx = > asio::io_context;using asio_sys_err = boost::system::error_code;using > asio::yield_context;#include "async_pending_queue.hpp"class MemoryChecker > {public: using bytes_cnt = size_t; MemoryChecker(asio_ioctx &ioctx, > bytes_cnt total_mem = 1024, bytes_cnt initial_fill = 0); // This requests > some space, and possibly yields; void request_space(bytes_cnt cnt, > yield_context yield); void free_space(bytes_cnt cnt);private: bytes_cnt > get_available_mem(); const bytes_cnt total_mem_; bytes_cnt mem_used_; > std::queue<bytes_cnt> request_queue_; async_pending_queue > request_routines_; std::mutex lock_; asio_ioctx::strand fifo_strand_; > asio_ioctx &completion_ioctx_;};inline > MemoryChecker::MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem, > bytes_cnt initial_fill) : total_mem_(total_mem), > mem_used_(initial_fill), request_queue_{}, lock_{}, > request_routines_{}, completion_ioctx_(ioctx), fifo_strand_(ioctx) {}inline > MemoryChecker::bytes_cnt MemoryChecker::get_available_mem() { > assert(total_mem_ >= mem_used_); return total_mem_ - mem_used_;}inline > void MemoryChecker::request_space(bytes_cnt cnt, yield_context yield) { // > if (cnt > total_mem_) throw logic_error lock_.lock(); assert(cnt <= > total_mem_); assert(cnt > 0); if (request_queue_.empty()) { > assert(request_routines_.pending_count() == 0); if (get_available_mem() > >= cnt) { // We bypass the pending queue mem_used_ += cnt; > lock_.unlock(); return; } } assert(request_queue_.size() == > request_routines_.pending_count()); std::cout << "Pushing " << cnt << > std::endl; request_queue_.push(cnt); auto wg = > asio::make_work_guard(completion_ioctx_); > request_routines_.async_submit(yield, [this] { lock_.unlock(); }); auto > oldest_req{request_queue_.front()}; assert(cnt == oldest_req); > request_queue_.pop(); mem_used_ += cnt; assert(request_queue_.size() == > request_routines_.pending_count()); asio::post(fifo_strand_, > yield);}inline void MemoryChecker::free_space(bytes_cnt cnt) { { > std::lock_guard<std::mutex> lg{lock_}; mem_used_ -= cnt; // Here, we > own the lock, and free as many coroutines as we can while (true) { > if (request_queue_.size() == 0) { std::cout << "No pending requests. > Bailing" << std::endl; break; } > assert(request_queue_.size() == request_routines_.pending_count()); > auto oldest_req{request_queue_.front()}; auto > available_mem{get_available_mem()}; if (available_mem < oldest_req) { > std::cout << "Oldest request is larger than available_mem. Bailing" > << std::endl; break; } > assert(request_routines_.try_run_one() == true); } }}#endif /* > MEM_CHECK_HPP */* > > Here is a test program that can run it: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *#include "mem_check.hpp"#include <thread>#include <unistd.h>constexpr > size_t mc_size{4};asio_ioctx ioctx;size_t total{0};MemoryChecker mc{ioctx, > mc_size};void requestor_coroutine(size_t rq,yield_context yield) { > asio::steady_timer t(ioctx); while (true) { total += rq; > mc.request_space(rq, yield); std::cout << "Got requested space "; > asio_sys_err ec; }}int main() { asio::spawn(ioctx, [](yield_context > yield) { requestor_coroutine(1,yield); }); asio::spawn(ioctx, > [](yield_context yield) { requestor_coroutine(2,yield); }); > asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(3,yield); > }); asio::spawn(ioctx, [](yield_context yield) { > requestor_coroutine(4,yield); }); std::thread t([] { ioctx.run(); }); > while (true) { getchar(); std::cout << total << std::endl; if > (total > 0) { std::cout << "freeing" << std::endl; > mc.free_space(1); total -= 1; } } t.join();}* > > Finally, the problem we face is as follows. When we run the program, the > assertion `total_mem >= mem_used_` fails. On some further investigation, I > realized that our completion token was being called even when we do not > call `try_run_one`, which was very weird. > > Finally, somewhat more surprisingly, If I replace > *post(stand_,std::move(posted_lambda));* by > *post(pending_queue_,std::move(posted_lambda));*, things seem to work. > However, the asio documentation says that only strands guarantee a FIFO > execution order. I am not sure if using a simple `io_context` will work as > a FIFO queue (even though it seems to in these examples). > > Any inputs would be helpful - I am happy to hear the problem in this > implementation, as well as other implementations (for example, using a > std::queue as a proper queue instead of this io_context hack). > > This question has also been posted here: > https://stackoverflow.com/questions/76310252/c-boost-asio-building-a-conditional-variable-using-io-context-strand > . > > Thanks, > Suraaj > _______________________________________________ > Boost-users mailing list > Boost-users@lists.boost.org > https://lists.boost.org/mailman/listinfo.cgi/boost-users >
_______________________________________________ Boost-users mailing list Boost-users@lists.boost.org https://lists.boost.org/mailman/listinfo.cgi/boost-users