Hello List,

I'm using the boost::interprocess::message_queue in a project and to
verify everything is working I created a test case. In that test case
I'm sending and receiving data via threads.

The test passes on Linux, Windows with no sanitizer and also with
AddressSanitizer and ThreadSanitizer. I recently added Apple Silicon
(M1) to the test matrix and now the test run with ThreadSanitizer
enabled fails.

Is boost::interprocess::message_queue thread-safe?
Can the reported TSan error be an false-positive?
If it that case, what test can I run to verify that?

I looked at code of atomic_cas32() and it uses the legacy built-in __sync_val_compare_and_swap().
Can it be that that built-in isn't supported on ARM?
Is there a newer built-in which should be used?

The reported TSan error:
-- snip
==================
WARNING: ThreadSanitizer: data race (pid=4146)
  Atomic write of size 4 at 0x000104af4028 by thread T2:
#0 __tsan_atomic32_compare_exchange_val <null>:81245684 (libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x57854)
    #1 boost::interprocess::ipcdetail::atomic_cas32(unsigned int
volatile*, unsigned int, unsigned int) atomic.hpp:581
(tsan_boost:arm64+0x100005b30)

  Previous read of size 4 at 0x000104af4028 by thread T1:
    #0 boost::interprocess::ipcdetail::spin_mutex::try_lock()
mutex.hpp:100 (tsan_boost:arm64+0x100006eb0)
    #1 void
boost::interprocess::ipcdetail::try_based_lock<boost::interprocess::ipcdetail::spin_mutex>(boost::interprocess::ipcdetail::spin_mutex&)
common_algorithms.hpp:62 (tsan_boost:arm64+0x100006e28)

SUMMARY: ThreadSanitizer: data race atomic.hpp:581 in
boost::interprocess::ipcdetail::atomic_cas32(unsigned int volatile*,
unsigned int, unsigned int)
-- snap

Here is a code example which demonstrates the test case
-- snip
#include <boost/interprocess/ipc/message_queue.hpp>

#include <iostream>
#include <vector>
#include <thread>
#include <memory>

struct MessageQueueImpl
{
  MessageQueueImpl()
  {
      boost::interprocess::permissions file_permissions;
      file_permissions.set_permissions(0660);
queue = std::make_unique<boost::interprocess::message_queue>(boost::interprocess::create_only,
"tsanqueue", 10, 15, file_permissions);
  }
  std::unique_ptr<boost::interprocess::message_queue> queue;
};

class MessageQueue
{
public:
  MessageQueue()
    : m_impl(std::make_unique<MessageQueueImpl>())
  {}

  void send(const std::vector<uint8_t>& buffer)
  {
    m_impl->queue->send(buffer.data(), buffer.size(), 0);
  }

  void receive(std::vector<uint8_t>& buffer)
  {
    boost::interprocess::message_queue::size_type received_size = 0;
    buffer.resize(m_impl->queue->get_max_msg_size());

    uint32_t prio;
m_impl->queue->receive(buffer.data(), buffer.size(), received_size, prio);
    buffer.resize(received_size);
  }

private:
  std::unique_ptr<MessageQueueImpl> m_impl;
};

int main()
{
  auto uut = std::make_unique<MessageQueue>();
  std::thread sender([&uut]{
    for(uint8_t i = 0; i < 3; ++i) {
      const std::vector<uint8_t> send_buffer = {i};
      uut->send(send_buffer);
    }
  });
  std::thread receiver([&uut]{
    for(uint8_t i = 0; i < 3; ++i) {
      std::vector<uint8_t> receive_buffer;
      uint32_t prio;
      uut->receive(receive_buffer);

      std::cout << "buffer received:\n";
      for (const auto i : receive_buffer) {
        std::cout << i << '\n';
      }
    }
  });

  if(sender.joinable()) {
    sender.join();
  }
  if(receiver.joinable()) {
    receiver.join();
  }
  return 0;
}
-- snap

Cheers,
 Jens
_______________________________________________
Boost-users mailing list
Boost-users@lists.boost.org
https://lists.boost.org/mailman/listinfo.cgi/boost-users

Reply via email to