I encountered this problem too.

Chris recommended using a steady_timer to construct an async semaphore. I
wasn't happy with that, so wrote an async_semaphore type.

Klemens then picked up on that and refined it.

https://github.com/klemens-morgenstern/sam


On Tue, 23 May 2023 at 00:44, Suraaj K S via Boost-users <
boost-users@lists.boost.org> wrote:

> Hi all,
>
> In boost-asio, I realized that there was no easy way to have something
> that resembles a condition variable.
>
> However, I realized that I could get something very similar using stackful
> coroutines (
> https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spawn.html),
> while using an io_context as a 'queue' for completion tokens. Below is my
> approach:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *#include "asio_types.hpp"#include <atomic>#include <iostream>class
> async_pending_queue {public:  async_pending_queue()      :
> pending_handlers_(0), strand_(pending_queue_),
> wg_(asio::make_work_guard(pending_queue_)) {}  template <typename
> CompletionToken>  auto async_submit(      CompletionToken &&token,
> std::function<void(void)> atomic_action = [] {}) {    auto init = [this,
> &atomic_action](auto completion_handler) {      auto posted_lambda =
> [handler = std::move(completion_handler),
> this]() mutable {        pending_handlers_--;        asio_sys_err ec;
>   handler(ec);        };      post(strand_,std::move(posted_lambda));
> pending_handlers_++;      atomic_action();    };    return
> asio::async_initiate<CompletionToken, void(asio_sys_err)>(init,
>                                                          token);  }  int
> pending_count() { return pending_handlers_.load(); }  // It may not run 1
> and run 0  bool try_run_one() {    auto cnt = pending_queue_.poll_one();
> std::cout << "completion token result" << cnt << std::endl;    bool ret =
> (cnt == 1);    return ret;  }private:  std::atomic<unsigned int>
> pending_handlers_;  asio_ioctx pending_queue_;  asio_ioctx::strand
> strand_;  decltype(asio::make_work_guard(pending_queue_)) wg_;};*
> Here, one simply uses calls `my_async_pending_queue.async_submit(yield)`,
> if calling from a stackful coroutine. The coroutine can be continued by
> calling `my_async_pending_queue.try_run_one()`.
>
> Using this, I wanted to build a 'memory checker'. It has two functions ->
> `request_space` and `free_space`. A coroutine calls `request_space`, which
> may block if there is no space left. Meanwhile, another thread / coroutine
> can call `free_space`, which will run blocked coroutines if possible.
>
> I built a toy memory checker wrapper as follows:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *#ifndef MEM_CHECK_HPP#define MEM_CHECK_HPP#include <cstddef>#include
> <mutex>#include <queue>#include <boost/asio.hpp>#include
> <boost/asio/spawn.hpp>namespace asio = boost::asio;using asio_ioctx =
> asio::io_context;using asio_sys_err = boost::system::error_code;using
> asio::yield_context;#include "async_pending_queue.hpp"class MemoryChecker
> {public:  using bytes_cnt = size_t;  MemoryChecker(asio_ioctx &ioctx,
> bytes_cnt total_mem = 1024, bytes_cnt initial_fill = 0);  // This requests
> some space, and possibly yields;  void request_space(bytes_cnt cnt,
> yield_context yield);  void free_space(bytes_cnt cnt);private:  bytes_cnt
> get_available_mem();  const bytes_cnt total_mem_;  bytes_cnt mem_used_;
> std::queue<bytes_cnt> request_queue_;  async_pending_queue
> request_routines_;  std::mutex lock_;  asio_ioctx::strand fifo_strand_;
> asio_ioctx &completion_ioctx_;};inline
> MemoryChecker::MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem,
>                      bytes_cnt initial_fill)    : total_mem_(total_mem),
> mem_used_(initial_fill), request_queue_{}, lock_{},
> request_routines_{}, completion_ioctx_(ioctx), fifo_strand_(ioctx) {}inline
> MemoryChecker::bytes_cnt MemoryChecker::get_available_mem() {
> assert(total_mem_ >= mem_used_);  return total_mem_ - mem_used_;}inline
> void MemoryChecker::request_space(bytes_cnt cnt, yield_context yield) {  //
> if (cnt > total_mem_) throw logic_error  lock_.lock();  assert(cnt <=
> total_mem_);  assert(cnt > 0);  if (request_queue_.empty()) {
> assert(request_routines_.pending_count() == 0);    if (get_available_mem()
> >= cnt) {      // We bypass the pending queue      mem_used_ += cnt;
> lock_.unlock();      return;    }  }  assert(request_queue_.size() ==
> request_routines_.pending_count());  std::cout << "Pushing " << cnt <<
> std::endl;  request_queue_.push(cnt);  auto wg =
> asio::make_work_guard(completion_ioctx_);
> request_routines_.async_submit(yield, [this] { lock_.unlock(); });  auto
> oldest_req{request_queue_.front()};  assert(cnt == oldest_req);
> request_queue_.pop();   mem_used_ += cnt;  assert(request_queue_.size() ==
> request_routines_.pending_count());  asio::post(fifo_strand_,
> yield);}inline void MemoryChecker::free_space(bytes_cnt cnt) {  {
> std::lock_guard<std::mutex> lg{lock_};    mem_used_ -= cnt;    // Here, we
> own the lock, and free as many coroutines as we can    while (true) {
> if (request_queue_.size() == 0) {        std::cout << "No pending requests.
> Bailing" << std::endl;        break;      }
> assert(request_queue_.size() == request_routines_.pending_count());
> auto oldest_req{request_queue_.front()};      auto
> available_mem{get_available_mem()};      if (available_mem < oldest_req) {
>       std::cout << "Oldest request is larger than available_mem. Bailing"
> << std::endl;        break;      }
> assert(request_routines_.try_run_one() == true);    }  }}#endif /*
> MEM_CHECK_HPP */*
>
> Here is a test program that can run it:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *#include "mem_check.hpp"#include <thread>#include <unistd.h>constexpr
> size_t mc_size{4};asio_ioctx ioctx;size_t total{0};MemoryChecker mc{ioctx,
> mc_size};void requestor_coroutine(size_t rq,yield_context yield) {
> asio::steady_timer t(ioctx);  while (true) {    total += rq;
> mc.request_space(rq, yield);    std::cout << "Got requested space ";
> asio_sys_err ec;  }}int main() {  asio::spawn(ioctx, [](yield_context
> yield) { requestor_coroutine(1,yield); });  asio::spawn(ioctx,
> [](yield_context yield) { requestor_coroutine(2,yield); });
> asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(3,yield);
> });  asio::spawn(ioctx, [](yield_context yield) {
> requestor_coroutine(4,yield); });  std::thread t([] { ioctx.run(); });
> while (true) {    getchar();    std::cout << total << std::endl;    if
> (total > 0) {      std::cout << "freeing" << std::endl;
> mc.free_space(1);      total -= 1;    }  }  t.join();}*
>
> Finally, the problem we face is as follows. When we run the program, the
> assertion `total_mem >= mem_used_` fails. On some further investigation, I
> realized that our completion token was being called even when we do not
> call `try_run_one`, which was very weird.
>
> Finally, somewhat more surprisingly, If I replace
> *post(stand_,std::move(posted_lambda));* by
> *post(pending_queue_,std::move(posted_lambda));*, things seem to work.
> However, the asio documentation says that only strands guarantee a FIFO
> execution order. I am not sure if using a simple `io_context` will work as
> a FIFO queue (even though it seems to in these examples).
>
> Any inputs would be helpful - I am happy to hear the problem in this
> implementation, as well as other implementations (for example, using a
> std::queue as a proper queue instead of this io_context hack).
>
> This question has also been posted here:
> https://stackoverflow.com/questions/76310252/c-boost-asio-building-a-conditional-variable-using-io-context-strand
> .
>
> Thanks,
> Suraaj
> _______________________________________________
> Boost-users mailing list
> Boost-users@lists.boost.org
> https://lists.boost.org/mailman/listinfo.cgi/boost-users
>
_______________________________________________
Boost-users mailing list
Boost-users@lists.boost.org
https://lists.boost.org/mailman/listinfo.cgi/boost-users

Reply via email to