Hi Benjamin, see comments inline.
> On Jul 16, 2018, at 5:48 AM, Benjamin Bannier > <benjamin.bann...@mesosphere.io> wrote: > > Hi Dario, > > this patch introduced two new clang-tidy warnings. Could we try to get these > down to zero, even if the code does not look bad? > > > I already created a patch for the unused lambda capture, > > https://reviews.apache.org/r/67927/ > > While the code does look reasonable, as a somewhat weird exception C++ allows > referencing some variables without capturing them. > Yes, I was a bit confused by the MSVC error there. I added the explicit capture because I thought it would be preferable over implicit capture, but I’m fine with either. Thanks for creating the patch! > > I also looked into the warning on the “excessive padding”. Adding some > explicit padding seems to make clang-tidy content, but I wasn’t sure whether > we just wanted to put `head` and `tail` on separate cache lines, or also > cared about the padding added after `tail`. > > private: > std::atomic<Node<T>*> head; > > char padding[128 - sizeof(std::atomic<Node<T>*>)]; > > // TODO(drexin): Programatically get the cache line size. > alignas(128) Node<T>* tail; // FIXME: IMO no need for `alignas` to > separate `head` and `tail`. > > Could you put up a patch for that? You can run the linter yourself; it is > `support/mesos-tidy.sh`. > That’s interesting. The padding after tail is a good point, we should definitely add that to prevent false sharing. If we add the padding, is alignas still necessary? Thanks, Dario > > Cheers, > > Benjamin > > >> On Jul 15, 2018, at 7:02 PM, b...@apache.org wrote: >> >> Repository: mesos >> Updated Branches: >> refs/heads/master a11a6a3d8 -> b1eafc035 >> >> >> Added mpsc_linked_queue and use it as the concurrent event queue. >> >> https://reviews.apache.org/r/62515 >> >> >> Project: http://git-wip-us.apache.org/repos/asf/mesos/repo >> Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1eafc03 >> Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1eafc03 >> Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1eafc03 >> >> Branch: refs/heads/master >> Commit: b1eafc035426bc39df4dba81c5c46b8b2d970339 >> Parents: a11a6a3 >> Author: Dario Rexin <dre...@apple.com> >> Authored: Sat Jul 7 13:20:22 2018 -0700 >> Committer: Benjamin Hindman <benjamin.hind...@gmail.com> >> Committed: Sun Jul 15 09:55:28 2018 -0700 >> >> ---------------------------------------------------------------------- >> 3rdparty/libprocess/Makefile.am | 1 + >> 3rdparty/libprocess/src/event_queue.hpp | 168 ++--------------- >> 3rdparty/libprocess/src/mpsc_linked_queue.hpp | 179 +++++++++++++++++++ >> 3rdparty/libprocess/src/tests/CMakeLists.txt | 1 + >> 3rdparty/libprocess/src/tests/benchmarks.cpp | 64 ++++++- >> .../src/tests/mpsc_linked_queue_tests.cpp | 104 +++++++++++ >> 6 files changed, 367 insertions(+), 150 deletions(-) >> ---------------------------------------------------------------------- >> >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/Makefile.am >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/Makefile.am >> b/3rdparty/libprocess/Makefile.am >> index 2d356aa..631491a 100644 >> --- a/3rdparty/libprocess/Makefile.am >> +++ b/3rdparty/libprocess/Makefile.am >> @@ -307,6 +307,7 @@ libprocess_tests_SOURCES = >> \ >> src/tests/loop_tests.cpp \ >> src/tests/main.cpp \ >> src/tests/metrics_tests.cpp \ >> + src/tests/mpsc_linked_queue_tests.cpp \ >> src/tests/mutex_tests.cpp \ >> src/tests/owned_tests.cpp \ >> src/tests/process_tests.cpp \ >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/event_queue.hpp >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/event_queue.hpp >> b/3rdparty/libprocess/src/event_queue.hpp >> index 21c522d..999d552 100644 >> --- a/3rdparty/libprocess/src/event_queue.hpp >> +++ b/3rdparty/libprocess/src/event_queue.hpp >> @@ -17,10 +17,6 @@ >> #include <mutex> >> #include <string> >> >> -#ifdef LOCK_FREE_EVENT_QUEUE >> -#include <concurrentqueue.h> >> -#endif // LOCK_FREE_EVENT_QUEUE >> - >> #include <process/event.hpp> >> #include <process/http.hpp> >> >> @@ -28,6 +24,10 @@ >> #include <stout/stringify.hpp> >> #include <stout/synchronized.hpp> >> >> +#ifdef LOCK_FREE_EVENT_QUEUE >> +#include "mpsc_linked_queue.hpp" >> +#endif // LOCK_FREE_EVENT_QUEUE >> + >> namespace process { >> >> // A _multiple_ producer (MP) _single_ consumer (SC) event queue for a >> @@ -187,185 +187,55 @@ private: >> #else // LOCK_FREE_EVENT_QUEUE >> void enqueue(Event* event) >> { >> - Item item = {sequence.fetch_add(1), event}; >> if (comissioned.load()) { >> - queue.enqueue(std::move(item)); >> + queue.enqueue(event); >> } else { >> - sequence.fetch_sub(1); >> delete event; >> } >> } >> >> Event* dequeue() >> { >> - // NOTE: for performance reasons we don't check `comissioned` here >> - // so it's possible that we'll loop forever if a consumer called >> - // `decomission()` and then subsequently called `dequeue()`. >> - Event* event = nullptr; >> - do { >> - // Given the nature of the concurrent queue implementation it's >> - // possible that we'll need to try to dequeue multiple times >> - // until it returns an event even though we know there is an >> - // event because the semantics are that we shouldn't call >> - // `dequeue()` before calling `empty()`. >> - event = try_dequeue(); >> - } while (event == nullptr); >> - return event; >> + return queue.dequeue(); >> } >> >> bool empty() >> { >> - // NOTE: for performance reasons we don't check `comissioned` here >> - // so it's possible that we'll return true when in fact we've been >> - // decomissioned and you shouldn't attempt to dequeue anything. >> - return (sequence.load() - next) == 0; >> + return queue.empty(); >> } >> >> void decomission() >> { >> comissioned.store(true); >> while (!empty()) { >> - // NOTE: we use `try_dequeue()` here because we might be racing >> - // with `enqueue()` where they've already incremented `sequence` >> - // so we think there are more items to dequeue but they aren't >> - // actually going to enqueue anything because they've since seen >> - // `comissioned` is true. We'll attempt to dequeue with >> - // `try_dequeue()` and eventually they'll decrement `sequence` >> - // and so `empty()` will return true and we'll bail. >> - Event* event = try_dequeue(); >> - if (event != nullptr) { >> - delete event; >> - } >> + delete dequeue(); >> } >> } >> >> template <typename T> >> size_t count() >> { >> - // Try and dequeue more elements first! >> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); >> - >> - return std::count_if( >> - items.begin(), >> - items.end(), >> - [](const Item& item) { >> - if (item.event != nullptr) { >> - return item.event->is<T>(); >> - } >> - return false; >> - }); >> + size_t count = 0; >> + queue.for_each([&count](Event* event) { >> + if (event->is<T>()) { >> + count++; >> + } >> + }); >> + return count; >> } >> >> operator JSON::Array() >> { >> - // Try and dequeue more elements first! >> - queue.try_dequeue_bulk(std::back_inserter(items), SIZE_MAX); >> - >> JSON::Array array; >> - foreach (const Item& item, items) { >> - if (item.event != nullptr) { >> - array.values.push_back(JSON::Object(*item.event)); >> - } >> - } >> + queue.for_each([&array](Event* event) { >> + array.values.push_back(JSON::Object(*event)); >> + }); >> >> return array; >> } >> >> - struct Item >> - { >> - uint64_t sequence; >> - Event* event; >> - }; >> - >> - Event* try_dequeue() >> - { >> - // The general algoritm here is as follows: we bulk dequeue as >> - // many items from the concurrent queue as possible. We then look >> - // for the `next` item in the sequence hoping that it's at the >> - // beginning of `items` but because the `queue` is not >> - // linearizable it might be "out of order". If we find it out of >> - // order we effectively dequeue it but leave it in `items` so as >> - // not to incur any costly rearrangements/compactions in >> - // `items`. We'll later pop the out of order items once they get >> - // to the front. >> - >> - // Start by popping any items that we effectively dequeued but >> - // didn't remove from `items` so as not to incur costly >> - // rearragements/compactions. >> - while (!items.empty() && next > items.front().sequence) { >> - items.pop_front(); >> - } >> - >> - // Optimistically let's hope that the next item is at the front of >> - // `item`. If so, pop the item, increment `next`, and return the >> - // event. >> - if (!items.empty() && items.front().sequence == next) { >> - Event* event = items.front().event; >> - items.pop_front(); >> - next += 1; >> - return event; >> - } >> - >> - size_t index = 0; >> - >> - do { >> - // Now look for a potentially out of order item. If found, >> - // signifiy the item has been dequeued by nulling the event >> - // (necessary for the implementation of `count()` and `operator >> - // JSON::Array()`) and return the event. >> - for (; index < items.size(); index++) { >> - if (items[index].sequence == next) { >> - Event* event = items[index].event; >> - items[index].event = nullptr; >> - next += 1; >> - return event; >> - } >> - } >> - >> - // If we can bulk dequeue more items then keep looking for the >> - // out of order event! >> - // >> - // NOTE: we use the _small_ value of `4` to dequeue here since >> - // in the presence of enough events being enqueued we could end >> - // up spending a LONG time dequeuing here! Since the next event >> - // in the sequence should really be close to the top of the >> - // queue we use a small value to dequeue. >> - // >> - // The intuition here is this: the faster we can return the next >> - // event the faster that event can get processed and the faster >> - // it might generate other events that can get processed in >> - // parallel by other threads and the more work we get done. >> - } while (queue.try_dequeue_bulk(std::back_inserter(items), 4) != 0); >> - >> - return nullptr; >> - } >> - >> // Underlying queue of items. >> - moodycamel::ConcurrentQueue<Item> queue; >> - >> - // Counter to represent the item sequence. Note that we use a >> - // unsigned 64-bit integer which means that even if we were adding >> - // one item to the queue every nanosecond we'd be able to run for >> - // 18,446,744,073,709,551,615 nanoseconds or ~585 years! ;-) >> - std::atomic<uint64_t> sequence = ATOMIC_VAR_INIT(0); >> - >> - // Counter to represent the next item we expect to dequeue. Note >> - // that we don't need to make this be atomic because only a single >> - // consumer is ever reading or writing this variable! >> - uint64_t next = 0; >> - >> - // Collection of bulk dequeued items that may be out of order. Note >> - // that like `next` this will only ever be read/written by a single >> - // consumer. >> - // >> - // The use of a deque was explicit because it is implemented as an >> - // array of arrays (or vector of vectors) which usually gives good >> - // performance for appending to the back and popping from the front >> - // which is exactly what we need to do. To avoid any performance >> - // issues that might be incurred we do not remove any items from the >> - // middle of the deque (see comments in `try_dequeue()` above for >> - // more details). >> - std::deque<Item> items; >> + MpscLinkedQueue<Event> queue; >> >> // Whether or not the event queue has been decomissioned. This must >> // be atomic as it can be read by a producer even though it's only >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/mpsc_linked_queue.hpp >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/mpsc_linked_queue.hpp >> b/3rdparty/libprocess/src/mpsc_linked_queue.hpp >> new file mode 100644 >> index 0000000..48c9509 >> --- /dev/null >> +++ b/3rdparty/libprocess/src/mpsc_linked_queue.hpp >> @@ -0,0 +1,179 @@ >> +// Licensed under the Apache License, Version 2.0 (the "License"); >> +// you may not use this file except in compliance with the License. >> +// You may obtain a copy of the License at >> +// >> +// http://www.apache.org/licenses/LICENSE-2.0 >> +// >> +// Unless required by applicable law or agreed to in writing, software >> +// distributed under the License is distributed on an "AS IS" BASIS, >> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> +// See the License for the specific language governing permissions and >> +// limitations under the License >> + >> +#ifndef __MPSC_LINKED_QUEUE_HPP__ >> +#define __MPSC_LINKED_QUEUE_HPP__ >> + >> +#include <atomic> >> +#include <functional> >> + >> +#include <glog/logging.h> >> + >> +namespace process { >> + >> +// This queue is a C++ port of the MpscLinkedQueue of JCTools, but limited >> to >> +// the core methods: >> +// >> https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java >> +// >> +// which is a Java port of the MPSC algorithm as presented in following >> article: >> +// >> http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue >> +// >> +// The queue has following properties: >> +// Producers are wait-free (one atomic exchange per enqueue) >> +// Consumer is >> +// - lock-free >> +// - mostly wait-free, except when consumer reaches the end of the queue >> +// and producer enqueued a new node, but did not update the next >> pointer >> +// on the old node, yet >> +template <typename T> >> +class MpscLinkedQueue >> +{ >> +private: >> + template <typename E> >> + struct Node >> + { >> + public: >> + explicit Node(E* element = nullptr) : element(element) {} >> + >> + E* element; >> + std::atomic<Node<E>*> next = ATOMIC_VAR_INIT(nullptr); >> + }; >> + >> +public: >> + MpscLinkedQueue() >> + { >> + tail = new Node<T>(); >> + head.store(tail); >> + } >> + >> + ~MpscLinkedQueue() >> + { >> + while (auto element = dequeue()) { >> + delete element; >> + } >> + >> + delete tail; >> + } >> + >> + // Multi producer safe. >> + void enqueue(T* element) >> + { >> + // A `nullptr` is used to denote an empty queue when doing a >> + // `dequeue()` so producers can't use it as an element. >> + CHECK_NOTNULL(element); >> + >> + auto newNode = new Node<T>(element); >> + >> + // Exchange is guaranteed to only give the old value to one >> + // producer, so this is safe and wait-free. >> + auto oldhead = head.exchange(newNode, std::memory_order_release); >> + >> + // At this point if this thread context switches out we may block >> + // the consumer from doing a dequeue (see below). Eventually we'll >> + // unblock the consumer once we run again and execute the next >> + // line of code. >> + oldhead->next.store(newNode, std::memory_order_release); >> + } >> + >> + // Single consumer only. >> + T* dequeue() >> + { >> + auto currentTail = tail; >> + >> + // Check and see if there is an actual element linked from `tail` >> + // since we use `tail` as a "stub" rather than the actual element. >> + auto nextTail = currentTail->next.exchange( >> + nullptr, >> + std::memory_order_relaxed); >> + >> + // There are three possible cases here: >> + // >> + // (1) The queue is empty. >> + // (2) The queue appears empty but a producer is still enqueuing >> + // so let's wait for it and then dequeue. >> + // (3) We have something to dequeue. >> + // >> + // Start by checking if the queue is or appears empty. >> + if (nextTail == nullptr) { >> + // Now check if the queue is actually empty or just appears >> + // empty. If it's actually empty then return `nullptr` to denote >> + // emptiness. >> + if (head.load(std::memory_order_relaxed) == tail) { >> + return nullptr; >> + } >> + >> + // Another thread already inserted a new node, but did not >> + // connect it to the tail, yet, so we spin-wait. At this point >> + // we are not wait-free anymore. >> + do { >> + nextTail = currentTail->next.exchange( >> + nullptr, >> + std::memory_order_relaxed); >> + } while (nextTail == nullptr); >> + } >> + >> + CHECK_NOTNULL(nextTail); >> + >> + auto element = nextTail->element; >> + nextTail->element = nullptr; >> + >> + tail = nextTail; >> + delete currentTail; >> + >> + return element; >> + } >> + >> + // Single consumer only. >> + // >> + // TODO(drexin): Provide C++ style iteration so someone can just use >> + // the `std::for_each()`. >> + template <typename F> >> + void for_each(F&& f) >> + { >> + auto end = head.load(); >> + auto node = tail; >> + >> + for (;;) { >> + node = node->next.load(); >> + >> + // We are following the linked structure until we reach the end >> + // node. There is a race with new nodes being added, so we limit >> + // the traversal to the last node at the time we started. >> + if (node == nullptr) { >> + return; >> + } >> + >> + f(node->element); >> + >> + if (node == end) { >> + return; >> + } >> + } >> + } >> + >> + // Single consumer only. >> + bool empty() >> + { >> + return tail->next.load(std::memory_order_relaxed) == nullptr && >> + head.load(std::memory_order_relaxed) == tail; >> + } >> + >> +private: >> + std::atomic<Node<T>*> head; >> + >> + // TODO(drexin): Programatically get the cache line size. >> + alignas(128) Node<T>* tail; >> +}; >> + >> +} // namespace process { >> + >> +#endif // __MPSC_LINKED_QUEUE_HPP__ >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/CMakeLists.txt >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/tests/CMakeLists.txt >> b/3rdparty/libprocess/src/tests/CMakeLists.txt >> index 25a34f9..5814bc6 100644 >> --- a/3rdparty/libprocess/src/tests/CMakeLists.txt >> +++ b/3rdparty/libprocess/src/tests/CMakeLists.txt >> @@ -28,6 +28,7 @@ set(PROCESS_TESTS_SRC >> limiter_tests.cpp >> loop_tests.cpp >> metrics_tests.cpp >> + mpsc_linked_queue_tests.cpp >> mutex_tests.cpp >> owned_tests.cpp >> process_tests.cpp >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/benchmarks.cpp >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/tests/benchmarks.cpp >> b/3rdparty/libprocess/src/tests/benchmarks.cpp >> index 2ec0d42..e8ef21f 100644 >> --- a/3rdparty/libprocess/src/tests/benchmarks.cpp >> +++ b/3rdparty/libprocess/src/tests/benchmarks.cpp >> @@ -22,6 +22,7 @@ >> #include <iostream> >> #include <memory> >> #include <string> >> +#include <thread> >> #include <vector> >> >> #include <process/collect.hpp> >> @@ -40,6 +41,8 @@ >> >> #include "benchmarks.pb.h" >> >> +#include "mpsc_linked_queue.hpp" >> + >> namespace http = process::http; >> >> using process::CountDownLatch; >> @@ -567,7 +570,6 @@ private: >> long count = 0; >> }; >> >> - >> TEST(ProcessTest, Process_BENCHMARK_DispatchDefer) >> { >> constexpr long repeats = 100000; >> @@ -683,3 +685,63 @@ TEST(ProcessTest, >> Process_BENCHMARK_ProtobufInstallHandler) >> process.run(num_submessages); >> } >> } >> + >> + >> +TEST(ProcessTest, Process_BENCHMARK_MpscLinkedQueue) >> +{ >> + // NOTE: we set the total number of producers to be 1 less than the >> + // hardware concurrency so the consumer doesn't have to fight for >> + // processing time with the producers. >> + const unsigned int producerCount = std::thread::hardware_concurrency() - >> 1; >> + const int messageCount = 10000000; >> + const int totalCount = messageCount * producerCount; >> + std::string* s = new std::string(""); >> + process::MpscLinkedQueue<std::string> q; >> + >> + Stopwatch consumerWatch; >> + >> + auto consumer = std::thread([totalCount, &q, &consumerWatch]() { >> + consumerWatch.start(); >> + for (int i = totalCount; i > 0;) { >> + if (q.dequeue() != nullptr) { >> + i--; >> + } >> + } >> + consumerWatch.stop(); >> + }); >> + >> + std::vector<std::thread> producers; >> + >> + Stopwatch producerWatch; >> + producerWatch.start(); >> + >> + for (unsigned int t = 0; t < producerCount; t++) { >> + producers.push_back(std::thread([messageCount, s, &q]() { >> + for (int i = 0; i < messageCount; i++) { >> + q.enqueue(s); >> + } >> + })); >> + } >> + >> + for (std::thread& producer : producers) { >> + producer.join(); >> + } >> + >> + producerWatch.stop(); >> + >> + consumer.join(); >> + >> + Duration producerElapsed = producerWatch.elapsed(); >> + Duration consumerElapsed = consumerWatch.elapsed(); >> + >> + double consumerThroughput = (double) totalCount / consumerElapsed.secs(); >> + double producerThroughput = (double) totalCount / producerElapsed.secs(); >> + double throughput = consumerThroughput + producerThroughput; >> + >> + cout << "Estimated producer throughput (" << producerCount << " threads): >> " >> + << std::fixed << producerThroughput << " op/s" << endl; >> + cout << "Estimated consumer throughput: " >> + << std::fixed << consumerThroughput << " op/s" << endl; >> + cout << "Estimated total throughput: " >> + << std::fixed << throughput << " op/s" << endl; >> +} >> >> http://git-wip-us.apache.org/repos/asf/mesos/blob/b1eafc03/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >> ---------------------------------------------------------------------- >> diff --git a/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >> b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >> new file mode 100644 >> index 0000000..7699974 >> --- /dev/null >> +++ b/3rdparty/libprocess/src/tests/mpsc_linked_queue_tests.cpp >> @@ -0,0 +1,104 @@ >> +// Licensed under the Apache License, Version 2.0 (the "License"); >> +// you may not use this file except in compliance with the License. >> +// You may obtain a copy of the License at >> +// >> +// http://www.apache.org/licenses/LICENSE-2.0 >> +// >> +// Unless required by applicable law or agreed to in writing, software >> +// distributed under the License is distributed on an "AS IS" BASIS, >> +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >> +// See the License for the specific language governing permissions and >> +// limitations under the License >> + >> +#include <thread> >> + >> +#include <stout/gtest.hpp> >> +#include <stout/stringify.hpp> >> + >> +#include "mpsc_linked_queue.hpp" >> + >> + >> +TEST(MpscLinkedQueueTest, EnqueueDequeue) >> +{ >> + process::MpscLinkedQueue<std::string> q; >> + std::string* s = new std::string("test"); >> + q.enqueue(s); >> + std::string* s2 = q.dequeue(); >> + ASSERT_EQ(s, s2); >> + delete s2; >> +} >> + >> + >> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultiple) >> +{ >> + process::MpscLinkedQueue<std::string> q; >> + for (int i = 0; i < 20; i++) { >> + q.enqueue(new std::string(stringify(i))); >> + } >> + >> + for (int i = 0; i < 20; i++) { >> + std::string* s = q.dequeue(); >> + ASSERT_EQ(*s, stringify(i)); >> + delete s; >> + } >> +} >> + >> + >> +TEST(MpscLinkedQueueTest, EnqueueDequeueMultithreaded) >> +{ >> + process::MpscLinkedQueue<std::string> q; >> + std::vector<std::thread> threads; >> + for (int t = 0; t < 5; t++) { >> + threads.push_back( >> + std::thread([t, &q]() { >> + int start = t * 1000; >> + int end = start + 1000; >> + for (int i = start; i < end; i++) { >> + q.enqueue(new std::string(stringify(i))); >> + } >> + })); >> + } >> + >> + std::for_each(threads.begin(), threads.end(), [](std::thread& t) { >> + t.join(); >> + }); >> + >> + std::set<std::string> elements; >> + >> + std::string* s = nullptr; >> + while ((s = q.dequeue()) != nullptr) { >> + elements.insert(*s); >> + } >> + >> + ASSERT_EQ(5000UL, elements.size()); >> + >> + for (int i = 0; i < 5000; i++) { >> + ASSERT_NE(elements.end(), elements.find(stringify(i))); >> + } >> +} >> + >> + >> +TEST(MpscLinkedQueueTest, ForEach) >> +{ >> + process::MpscLinkedQueue<std::string> q; >> + for (int i = 0; i < 20; i++) { >> + q.enqueue(new std::string(stringify(i))); >> + } >> + int i = 0; >> + q.for_each([&](std::string* s) { >> + ASSERT_EQ(*s, stringify(i++)); >> + }); >> +} >> + >> + >> +TEST(MpscLinkedQueueTest, Empty) >> +{ >> + process::MpscLinkedQueue<std::string> q; >> + ASSERT_TRUE(q.empty()); >> + std::string* s = new std::string("test"); >> + q.enqueue(s); >> + ASSERT_FALSE(q.empty()); >> + q.dequeue(); >> + ASSERT_TRUE(q.empty()); >> + delete s; >> +} >> >