Hi Benjamin,

see comments inline.

> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier 
> <benjamin.bann...@mesosphere.io> wrote:
> 
> Hi Dario,
> 
> this patch introduced two new clang-tidy warnings. Could we try to get these 
> down to zero, even if the code does not look bad?
> 
> 
> I already created a patch for the unused lambda capture,
> 
>    https://reviews.apache.org/r/67927/
> 
> While the code does look reasonable, as a somewhat weird exception C++ allows 
> referencing some variables without capturing them.
> 

Yes, I was a bit confused by the MSVC error there. I added the explicit capture 
because I thought it would be preferable over implicit capture, but I’m fine 
with either. Thanks for creating the patch!

> 
> I also looked into the warning on the “excessive padding”. Adding some 
> explicit padding seems to make clang-tidy content, but I wasn’t sure whether 
> we just wanted to put `head` and `tail` on separate cache lines, or also 
> cared about the padding added after `tail`.
> 
>   private:
>      std::atomic<Node<T>*> head;
> 
>      char padding[128 - sizeof(std::atomic<Node<T>*>)];
> 
>      // TODO(drexin): Programatically get the cache line size.
>      alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to 
> separate `head` and `tail`.
> 
> Could you put up a patch for that? You can run the linter yourself; it is 
> `support/mesos-tidy.sh`.
> 

That’s interesting. The padding after tail is a good point, we should 
definitely add that to prevent false sharing. If we add the padding, is alignas 
still necessary?

Thanks,
Dario

> 
> Cheers,
> 
> Benjamin
> 
> 
>> On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote:
>> 
>> Repository: mesos
>> Updated Branches:
>> refs/heads/master a11a6a3d8 -> b1eafc035
>> 
>> 
>> Added mpsc_linked_queue and use it as the concurrent event queue.
>> 
>> https://reviews.apache.org/r/62515
>> 
>> 
>> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
>> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03
>> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03
>> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03
>> 
>> Branch: refs/heads/master
>> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339
>> Parents: a11a6a3
>> Author: Dario Rexin <dre...@apple.com>
>> Authored: Sat Jul 7 13:20:22 2018 -0700
>> Committer: Benjamin Hindman <benjamin.hind...@gmail.com>
>> Committed: Sun Jul 15 09:55:28 2018 -0700
>> 
>> ----------------------------------------------------------------------
>> 3rdparty/libprocess/Makefile.am                 |   1 +
>> 3rdparty/libprocess/src/event_queue.hpp         | 168 ++---------------
>> 3rdparty/libprocess/src/mpsc_linked_queue.hpp   | 179 +++++++++++++++++++
>> 3rdparty/libprocess/src/tests/CMakeLists.txt    |   1 +
>> 3rdparty/libprocess/src/tests/benchmarks.cpp    |  64 ++++++-
>> .../src/tests/mpsc_linked_queue_tests.cpp       | 104 +++++++++++
>> 6 files changed, 367 insertions(+), 150 deletions(-)
>> ----------------------------------------------------------------------
>> 
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/Makefile.am 
>> b/3rdparty/libprocess/Makefile.am
>> index 2d356aa..631491a 100644
>> --- a/3rdparty/libprocess/Makefile.am
>> +++ b/3rdparty/libprocess/Makefile.am
>> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES =                               
>>         \
>>  src/tests/loop_tests.cpp                                    \
>>  src/tests/main.cpp                                          \
>>  src/tests/metrics_tests.cpp                                 \
>> +  src/tests/mpsc_linked_queue_tests.cpp                             \
>>  src/tests/mutex_tests.cpp                                   \
>>  src/tests/owned_tests.cpp                                   \
>>  src/tests/process_tests.cpp                                 \
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/event_queue.hpp 
>> b/3rdparty/libprocess/src/event_queue.hpp
>> index 21c522d..999d552 100644
>> --- a/3rdparty/libprocess/src/event_queue.hpp
>> +++ b/3rdparty/libprocess/src/event_queue.hpp
>> @@ -17,10 +17,6 @@
>> #include <mutex>
>> #include <string>
>> 
>> -#ifdef LOCK_FREE_EVENT_QUEUE
>> -#include <concurrentqueue.h>
>> -#endif // LOCK_FREE_EVENT_QUEUE
>> -
>> #include <process/event.hpp>
>> #include <process/http.hpp>
>> 
>> @@ -28,6 +24,10 @@
>> #include <stout/stringify.hpp>
>> #include <stout/synchronized.hpp>
>> 
>> +#ifdef LOCK_FREE_EVENT_QUEUE
>> +#include "mpsc_linked_queue.hpp"
>> +#endif // LOCK_FREE_EVENT_QUEUE
>> +
>> namespace process {
>> 
>> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a
>> @@ -187,185 +187,55 @@ private:
>> #else // LOCK_FREE_EVENT_QUEUE
>>  void enqueue(Event* event)
>>  {
>> -    Item item = {sequence.fetch_add(1), event};
>>    if (comissioned.load()) {
>> -      queue.enqueue(std::move(item));
>> +      queue.enqueue(event);
>>    } else {
>> -      sequence.fetch_sub(1);
>>      delete event;
>>    }
>>  }
>> 
>>  Event* dequeue()
>>  {
>> -    // NOTE: for performance reasons we don't check `comissioned` here
>> -    // so it's possible that we'll loop forever if a consumer called
>> -    // `decomission()` and then subsequently called `dequeue()`.
>> -    Event* event = nullptr;
>> -    do {
>> -      // Given the nature of the concurrent queue implementation it's
>> -      // possible that we'll need to try to dequeue multiple times
>> -      // until it returns an event even though we know there is an
>> -      // event because the semantics are that we shouldn't call
>> -      // `dequeue()` before calling `empty()`.
>> -      event = try_dequeue();
>> -    } while (event == nullptr);
>> -    return event;
>> +    return queue.dequeue();
>>  }
>> 
>>  bool empty()
>>  {
>> -    // NOTE: for performance reasons we don't check `comissioned` here
>> -    // so it's possible that we'll return true when in fact we've been
>> -    // decomissioned and you shouldn't attempt to dequeue anything.
>> -    return (sequence.load() - next) == 0;
>> +    return queue.empty();
>>  }
>> 
>>  void decomission()
>>  {
>>    comissioned.store(true);
>>    while (!empty()) {
>> -      // NOTE: we use `try_dequeue()` here because we might be racing
>> -      // with `enqueue()` where they've already incremented `sequence`
>> -      // so we think there are more items to dequeue but they aren't
>> -      // actually going to enqueue anything because they've since seen
>> -      // `comissioned` is true. We'll attempt to dequeue with
>> -      // `try_dequeue()` and eventually they'll decrement `sequence`
>> -      // and so `empty()` will return true and we'll bail.
>> -      Event* event = try_dequeue();
>> -      if (event != nullptr) {
>> -        delete event;
>> -      }
>> +      delete dequeue();
>>    }
>>  }
>> 
>>  template <typename T>
>>  size_t count()
>>  {
>> -    // Try and dequeue more elements first!
>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>> -    return std::count_if(
>> -        items.begin(),
>> -        items.end(),
>> -        [](const Item& item) {
>> -          if (item.event != nullptr) {
>> -            return item.event->is<T>();
>> -          }
>> -          return false;
>> -        });
>> +    size_t count = 0;
>> +    queue.for_each([&count](Event* event) {
>> +      if (event->is<T>()) {
>> +        count++;
>> +      }
>> +    });
>> +    return count;
>>  }
>> 
>>  operator JSON::Array()
>>  {
>> -    // Try and dequeue more elements first!
>> -    queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX);
>> -
>>    JSON::Array array;
>> -    foreach (const Item& item, items) {
>> -      if (item.event != nullptr) {
>> -        array.values.push_back(JSON::Object(*item.event));
>> -      }
>> -    }
>> +    queue.for_each([&array](Event* event) {
>> +      array.values.push_back(JSON::Object(*event));
>> +    });
>> 
>>    return array;
>>  }
>> 
>> -  struct Item
>> -  {
>> -    uint64_t sequence;
>> -    Event* event;
>> -  };
>> -
>> -  Event* try_dequeue()
>> -  {
>> -    // The general algoritm here is as follows: we bulk dequeue as
>> -    // many items from the concurrent queue as possible. We then look
>> -    // for the `next` item in the sequence hoping that it's at the
>> -    // beginning of `items` but because the `queue` is not
>> -    // linearizable it might be "out of order". If we find it out of
>> -    // order we effectively dequeue it but leave it in `items` so as
>> -    // not to incur any costly rearrangements/compactions in
>> -    // `items`. We'll later pop the out of order items once they get
>> -    // to the front.
>> -
>> -    // Start by popping any items that we effectively dequeued but
>> -    // didn't remove from `items` so as not to incur costly
>> -    // rearragements/compactions.
>> -    while (!items.empty() && next > items.front().sequence) {
>> -      items.pop_front();
>> -    }
>> -
>> -    // Optimistically let's hope that the next item is at the front of
>> -    // `item`. If so, pop the item, increment `next`, and return the
>> -    // event.
>> -    if (!items.empty() && items.front().sequence == next) {
>> -      Event* event = items.front().event;
>> -      items.pop_front();
>> -      next += 1;
>> -      return event;
>> -    }
>> -
>> -    size_t index = 0;
>> -
>> -    do {
>> -      // Now look for a potentially out of order item. If found,
>> -      //  signifiy the item has been dequeued by nulling the event
>> -      //  (necessary for the implementation of `count()` and `operator
>> -      //  JSON::Array()`) and return the event.
>> -      for (; index < items.size(); index++) {
>> -        if (items[index].sequence == next) {
>> -          Event* event = items[index].event;
>> -          items[index].event = nullptr;
>> -          next += 1;
>> -          return event;
>> -        }
>> -      }
>> -
>> -      // If we can bulk dequeue more items then keep looking for the
>> -      // out of order event!
>> -      //
>> -      // NOTE: we use the _small_ value of `4` to dequeue here since
>> -      // in the presence of enough events being enqueued we could end
>> -      // up spending a LONG time dequeuing here! Since the next event
>> -      // in the sequence should really be close to the top of the
>> -      // queue we use a small value to dequeue.
>> -      //
>> -      // The intuition here is this: the faster we can return the next
>> -      // event the faster that event can get processed and the faster
>> -      // it might generate other events that can get processed in
>> -      // parallel by other threads and the more work we get done.
>> -    } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0);
>> -
>> -    return nullptr;
>> -  }
>> -
>>  // Underlying queue of items.
>> -  moodycamel::ConcurrentQueue<Item> queue;
>> -
>> -  // Counter to represent the item sequence. Note that we use a
>> -  // unsigned 64-bit integer which means that even if we were adding
>> -  // one item to the queue every nanosecond we'd be able to run for
>> -  // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-)
>> -  std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0);
>> -
>> -  // Counter to represent the next item we expect to dequeue. Note
>> -  // that we don't need to make this be atomic because only a single
>> -  // consumer is ever reading or writing this variable!
>> -  uint64_t next = 0;
>> -
>> -  // Collection of bulk dequeued items that may be out of order. Note
>> -  // that like `next` this will only ever be read/written by a single
>> -  // consumer.
>> -  //
>> -  // The use of a deque was explicit because it is implemented as an
>> -  // array of arrays (or vector of vectors) which usually gives good
>> -  // performance for appending to the back and popping from the front
>> -  // which is exactly what we need to do. To avoid any performance
>> -  // issues that might be incurred we do not remove any items from the
>> -  // middle of the deque (see comments in `try_dequeue()` above for
>> -  // more details).
>> -  std::deque<Item> items;
>> +  MpscLinkedQueue<Event> queue;
>> 
>>  // Whether or not the event queue has been decomissioned. This must
>>  // be atomic as it can be read by a producer even though it's only
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp 
>> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> new file mode 100644
>> index 0000000..48c9509
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp
>> @@ -0,0 +1,179 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +//     http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#ifndef __MPSC_LINKED_QUEUE_HPP__
>> +#define __MPSC_LINKED_QUEUE_HPP__
>> +
>> +#include <atomic>
>> +#include <functional>
>> +
>> +#include <glog/logging.h>
>> +
>> +namespace process {
>> +
>> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited 
>> to
>> +// the core methods:
>> +// 
>> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java
>> +//
>> +// which is a Java port of the MPSC algorithm as presented in following 
>> article:
>> +// 
>> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
>> +//
>> +// The queue has following properties:
>> +//   Producers are wait-free (one atomic exchange per enqueue)
>> +//   Consumer is
>> +//     - lock-free
>> +//     - mostly wait-free, except when consumer reaches the end of the queue
>> +//       and producer enqueued a new node, but did not update the next 
>> pointer
>> +//       on the old node, yet
>> +template <typename T>
>> +class MpscLinkedQueue
>> +{
>> +private:
>> +  template <typename E>
>> +  struct Node
>> +  {
>> +  public:
>> +    explicit Node(E* element = nullptr) : element(element) {}
>> +
>> +    E* element;
>> +    std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr);
>> +  };
>> +
>> +public:
>> +  MpscLinkedQueue()
>> +  {
>> +    tail = new Node<T>();
>> +    head.store(tail);
>> +  }
>> +
>> +  ~MpscLinkedQueue()
>> +  {
>> +    while (auto element = dequeue()) {
>> +      delete element;
>> +    }
>> +
>> +    delete tail;
>> +  }
>> +
>> +  // Multi producer safe.
>> +  void enqueue(T* element)
>> +  {
>> +    // A `nullptr` is used to denote an empty queue when doing a
>> +    // `dequeue()` so producers can't use it as an element.
>> +    CHECK_NOTNULL(element);
>> +
>> +    auto newNode = new Node<T>(element);
>> +
>> +    // Exchange is guaranteed to only give the old value to one
>> +    // producer, so this is safe and wait-free.
>> +    auto oldhead = head.exchange(newNode, std::memory_order_release);
>> +
>> +    // At this point if this thread context switches out we may block
>> +    // the consumer from doing a dequeue (see below). Eventually we'll
>> +    // unblock the consumer once we run again and execute the next
>> +    // line of code.
>> +    oldhead->next.store(newNode, std::memory_order_release);
>> +  }
>> +
>> +  // Single consumer only.
>> +  T* dequeue()
>> +  {
>> +    auto currentTail = tail;
>> +
>> +    // Check and see if there is an actual element linked from `tail`
>> +    // since we use `tail` as a "stub" rather than the actual element.
>> +    auto nextTail = currentTail->next.exchange(
>> +        nullptr,
>> +        std::memory_order_relaxed);
>> +
>> +    // There are three possible cases here:
>> +    //
>> +    // (1) The queue is empty.
>> +    // (2) The queue appears empty but a producer is still enqueuing
>> +    //     so let's wait for it and then dequeue.
>> +    // (3) We have something to dequeue.
>> +    //
>> +    // Start by checking if the queue is or appears empty.
>> +    if (nextTail == nullptr) {
>> +      // Now check if the queue is actually empty or just appears
>> +      // empty. If it's actually empty then return `nullptr` to denote
>> +      // emptiness.
>> +      if (head.load(std::memory_order_relaxed) == tail) {
>> +        return nullptr;
>> +      }
>> +
>> +      // Another thread already inserted a new node, but did not
>> +      // connect it to the tail, yet, so we spin-wait. At this point
>> +      // we are not wait-free anymore.
>> +      do {
>> +        nextTail = currentTail->next.exchange(
>> +            nullptr,
>> +            std::memory_order_relaxed);
>> +      } while (nextTail == nullptr);
>> +    }
>> +
>> +    CHECK_NOTNULL(nextTail);
>> +
>> +    auto element = nextTail->element;
>> +    nextTail->element = nullptr;
>> +
>> +    tail = nextTail;
>> +    delete currentTail;
>> +
>> +    return element;
>> +  }
>> +
>> +  // Single consumer only.
>> +  //
>> +  // TODO(drexin): Provide C++ style iteration so someone can just use
>> +  // the `std::for_each()`.
>> +  template <typename F>
>> +  void for_each(F&& f)
>> +  {
>> +    auto end = head.load();
>> +    auto node = tail;
>> +
>> +    for (;;) {
>> +      node = node->next.load();
>> +
>> +      // We are following the linked structure until we reach the end
>> +      // node. There is a race with new nodes being added, so we limit
>> +      // the traversal to the last node at the time we started.
>> +      if (node == nullptr) {
>> +        return;
>> +      }
>> +
>> +      f(node->element);
>> +
>> +      if (node == end) {
>> +        return;
>> +      }
>> +    }
>> +  }
>> +
>> +  // Single consumer only.
>> +  bool empty()
>> +  {
>> +    return tail->next.load(std::memory_order_relaxed) == nullptr &&
>> +      head.load(std::memory_order_relaxed) == tail;
>> +  }
>> +
>> +private:
>> +  std::atomic<Node<T>*> head;
>> +
>> +  // TODO(drexin): Programatically get the cache line size.
>> +  alignas(128) Node<T>* tail;
>> +};
>> +
>> +} // namespace process {
>> +
>> +#endif // __MPSC_LINKED_QUEUE_HPP__
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt 
>> b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> index 25a34f9..5814bc6 100644
>> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt
>> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt
>> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC
>>  limiter_tests.cpp
>>  loop_tests.cpp
>>  metrics_tests.cpp
>> +  mpsc_linked_queue_tests.cpp
>>  mutex_tests.cpp
>>  owned_tests.cpp
>>  process_tests.cpp
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp 
>> b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> index 2ec0d42..e8ef21f 100644
>> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp
>> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp
>> @@ -22,6 +22,7 @@
>> #include <iostream>
>> #include <memory>
>> #include <string>
>> +#include <thread>
>> #include <vector>
>> 
>> #include <process/collect.hpp>
>> @@ -40,6 +41,8 @@
>> 
>> #include "benchmarks.pb.h"
>> 
>> +#include "mpsc_linked_queue.hpp"
>> +
>> namespace http = process::http;
>> 
>> using process::CountDownLatch;
>> @@ -567,7 +570,6 @@ private:
>>  long count = 0;
>> };
>> 
>> -
>> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer)
>> {
>>  constexpr long repeats = 100000;
>> @@ -683,3 +685,63 @@ TEST(ProcessTest, 
>> Process_BENCHMARK_ProtobufInstallHandler)
>>    process.run(num_submessages);
>>  }
>> }
>> +
>> +
>> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue)
>> +{
>> +  // NOTE: we set the total number of producers to be 1 less than the
>> +  // hardware concurrency so the consumer doesn't have to fight for
>> +  // processing time with the producers.
>> +  const unsigned int producerCount = std::thread::hardware_concurrency() - 
>> 1;
>> +  const int messageCount = 10000000;
>> +  const int totalCount = messageCount * producerCount;
>> +  std::string* s = new std::string("");
>> +  process::MpscLinkedQueue<std::string> q;
>> +
>> +  Stopwatch consumerWatch;
>> +
>> +  auto consumer = std::thread([totalCount, &q, &consumerWatch]() {
>> +    consumerWatch.start();
>> +    for (int i = totalCount; i > 0;) {
>> +      if (q.dequeue() != nullptr) {
>> +        i--;
>> +      }
>> +    }
>> +    consumerWatch.stop();
>> +  });
>> +
>> +  std::vector<std::thread> producers;
>> +
>> +  Stopwatch producerWatch;
>> +  producerWatch.start();
>> +
>> +  for (unsigned int t = 0; t < producerCount; t++) {
>> +    producers.push_back(std::thread([messageCount, s, &q]() {
>> +      for (int i = 0; i < messageCount; i++) {
>> +        q.enqueue(s);
>> +      }
>> +    }));
>> +  }
>> +
>> +  for (std::thread& producer : producers) {
>> +    producer.join();
>> +  }
>> +
>> +  producerWatch.stop();
>> +
>> +  consumer.join();
>> +
>> +  Duration producerElapsed = producerWatch.elapsed();
>> +  Duration consumerElapsed = consumerWatch.elapsed();
>> +
>> +  double consumerThroughput = (double) totalCount / consumerElapsed.secs();
>> +  double producerThroughput = (double) totalCount / producerElapsed.secs();
>> +  double throughput = consumerThroughput + producerThroughput;
>> +
>> +  cout << "Estimated producer throughput (" << producerCount << " threads): 
>> "
>> +       << std::fixed << producerThroughput << " op/s" << endl;
>> +  cout << "Estimated consumer throughput: "
>> +       << std::fixed << consumerThroughput << " op/s" << endl;
>> +  cout << "Estimated total throughput: "
>> +       << std::fixed << throughput << " op/s" << endl;
>> +}
>> 
>> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> ----------------------------------------------------------------------
>> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp 
>> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> new file mode 100644
>> index 0000000..7699974
>> --- /dev/null
>> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp
>> @@ -0,0 +1,104 @@
>> +// Licensed under the Apache License, Version 2.0 (the "License");
>> +// you may not use this file except in compliance with the License.
>> +// You may obtain a copy of the License at
>> +//
>> +//     http://www.apache.org/licenses/LICENSE-2.0
>> +//
>> +// Unless required by applicable law or agreed to in writing, software
>> +// distributed under the License is distributed on an "AS IS" BASIS,
>> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> +// See the License for the specific language governing permissions and
>> +// limitations under the License
>> +
>> +#include <thread>
>> +
>> +#include <stout/gtest.hpp>
>> +#include <stout/stringify.hpp>
>> +
>> +#include "mpsc_linked_queue.hpp"
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeue)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  std::string* s = new std::string("test");
>> +  q.enqueue(s);
>> +  std::string* s2 = q.dequeue();
>> +  ASSERT_EQ(s, s2);
>> +  delete s2;
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  for (int i = 0; i < 20; i++) {
>> +    q.enqueue(new std::string(stringify(i)));
>> +  }
>> +
>> +  for (int i = 0; i < 20; i++) {
>> +    std::string* s = q.dequeue();
>> +    ASSERT_EQ(*s, stringify(i));
>> +    delete s;
>> +  }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  std::vector<std::thread> threads;
>> +  for (int t = 0; t < 5; t++) {
>> +    threads.push_back(
>> +        std::thread([t, &q]() {
>> +          int start = t * 1000;
>> +          int end = start + 1000;
>> +          for (int i = start; i < end; i++) {
>> +            q.enqueue(new std::string(stringify(i)));
>> +          }
>> +        }));
>> +  }
>> +
>> +  std::for_each(threads.begin(), threads.end(), [](std::thread& t) {
>> +    t.join();
>> +  });
>> +
>> +  std::set<std::string> elements;
>> +
>> +  std::string* s = nullptr;
>> +  while ((s = q.dequeue()) != nullptr) {
>> +    elements.insert(*s);
>> +  }
>> +
>> +  ASSERT_EQ(5000UL, elements.size());
>> +
>> +  for (int i = 0; i < 5000; i++) {
>> +    ASSERT_NE(elements.end(), elements.find(stringify(i)));
>> +  }
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, ForEach)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  for (int i = 0; i < 20; i++) {
>> +    q.enqueue(new std::string(stringify(i)));
>> +  }
>> +  int i = 0;
>> +  q.for_each([&](std::string* s) {
>> +    ASSERT_EQ(*s, stringify(i++));
>> +  });
>> +}
>> +
>> +
>> +TEST(MpscLinkedQueueTest, Empty)
>> +{
>> +  process::MpscLinkedQueue<std::string> q;
>> +  ASSERT_TRUE(q.empty());
>> +  std::string* s = new std::string("test");
>> +  q.enqueue(s);
>> +  ASSERT_FALSE(q.empty());
>> +  q.dequeue();
>> +  ASSERT_TRUE(q.empty());
>> +  delete s;
>> +}
>> 
> 

Reply via email to