http://git-wip-us.apache.org/repos/asf/marmotta/blob/2e32da5d/libraries/ostrich/backend/test/main.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/test/main.cc b/libraries/ostrich/backend/test/main.cc index 40755a6..5e6a52b 100644 --- a/libraries/ostrich/backend/test/main.cc +++ b/libraries/ostrich/backend/test/main.cc @@ -1,7 +1,7 @@ // Copyright 2015 Google Inc. All Rights Reserved. // Author: Sebastian Schaffert <[email protected]> #include <glog/logging.h> -#include "gtest.h" +#include "gtest/gtest.h" // run all tests in the current binary int main(int argc, char **argv) {
http://git-wip-us.apache.org/repos/asf/marmotta/blob/2e32da5d/libraries/ostrich/backend/util/iterator.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/util/iterator.h b/libraries/ostrich/backend/util/iterator.h index 002c24f..0d34a4b 100644 --- a/libraries/ostrich/backend/util/iterator.h +++ b/libraries/ostrich/backend/util/iterator.h @@ -23,6 +23,10 @@ #ifndef MARMOTTA_ITERATOR_H #define MARMOTTA_ITERATOR_H +#include <queue> +#include <mutex> +#include <condition_variable> + namespace marmotta { namespace util { @@ -39,19 +43,14 @@ class CloseableIterator { virtual ~CloseableIterator() {} /** - * Increment iterator to next element. + * Increment iterator to next element and return a reference to this element. */ - virtual CloseableIterator<T>& operator++() = 0; - - /** - * Dereference iterator, returning a reference to the current element. - */ - virtual T& operator*() = 0; + virtual const T& next() = 0; /** - * Dereference iterator, returning a pointer to the current element. + * Return a reference to the current element. */ - virtual T* operator->() = 0; + virtual const T& current() const = 0; /** * Return true in case the iterator has more elements. @@ -68,15 +67,11 @@ class EmptyIterator : public CloseableIterator<T> { public: EmptyIterator() { } - CloseableIterator<T> &operator++() override { - return *this; - } - - T &operator*() override { + const T &next() override { throw std::out_of_range("No more elements"); }; - T *operator->() override { + const T ¤t() const override { throw std::out_of_range("No more elements"); }; @@ -95,23 +90,21 @@ class SingletonIterator : public CloseableIterator<T> { public: SingletonIterator(T& value) : value(value), incremented(false) { } - CloseableIterator<T> &operator++() override { - incremented = true; - return *this; - }; - - T &operator*() override { - if (!incremented) + const T &next() override { + if (!incremented) { + incremented = true; return value; - else + } else { throw std::out_of_range("No more elements"); + } }; - T *operator->() override { - if (!incremented) - return &value; - else + const T ¤t() const override { + if (!incremented) { + return value; + } else { throw std::out_of_range("No more elements"); + } }; bool hasNext() override { @@ -124,7 +117,37 @@ class SingletonIterator : public CloseableIterator<T> { }; +/** + * An iterator wrapping a standard STL collection iterator. + */ +template<typename T> +class CollectionIterator : public CloseableIterator<T> { + public: + CollectionIterator(std::vector<T> values) + : values(values), index(0) { + } + const T& next() override { + index++; + return values[index-1]; + }; + + const T& current() const override { + return values[index-1]; + }; + + bool hasNext() override { + return index < values.size(); + }; + + private: + std::vector<T> values; + int index; +}; + +/** + * An iterator implementation supporting to pass a predicate for filtering values. + */ template<typename T> class FilteringIterator : public CloseableIterator<T> { public: @@ -137,31 +160,20 @@ class FilteringIterator : public CloseableIterator<T> { */ FilteringIterator(CloseableIterator<T>* it, PredicateFn pred) : it(it), pred(pred), nextExists(false) { - // run findNext twice so both current and next are set - findNext(); + // run findNext once so next is set findNext(); } /** * Increment iterator to next element. - */ - CloseableIterator<T>& operator++() override { - findNext(); - return *this; - }; - - /** - * Dereference iterator, returning a reference to the current element. */ - T& operator*() override { - return current; + const T& next() override { + findNext(); + return current_; }; - /** - * Dereference iterator, returning a pointer to the current element. - */ - T* operator->() override { - return ¤t; + const T& current() const override { + return current_; }; /** @@ -176,18 +188,18 @@ class FilteringIterator : public CloseableIterator<T> { std::unique_ptr<CloseableIterator<T>> it; PredicateFn pred; - T current; - T next; + T current_; + T next_; bool nextExists; void findNext() { - current = next; + current_ = next_; nextExists = false; while (it->hasNext()) { - if (pred(**it)) { - next = **it; + next_ = it->next(); + if (pred(next_)) { nextExists = true; break; } @@ -195,6 +207,69 @@ class FilteringIterator : public CloseableIterator<T> { } }; +/** + * A multi-threaded iterator supporting iteration over the results + * successively added by a producer. Blocks while the internal queue + * is empty and the producer is not yet finished reporting. The + * producer has to explicitly call finish() when there are no more + * elements to report. + */ +template<typename T> +class ProducerConsumerIterator : public CloseableIterator<T> { + public: + ProducerConsumerIterator() {} + + void add(const T& value) { + std::unique_lock<std::mutex> lock(mutex); + queue.push(value); + condition.notify_one(); + } + + void finish() { + std::unique_lock<std::mutex> lock(mutex); + finished = true; + condition.notify_all(); + } + + /** + * Increment iterator to next element. + */ + const T& next() override { + if (hasNext()) { + std::unique_lock<std::mutex> lock(mutex); + current_ = queue.front(); + queue.pop(); + } + return current_; + }; + + const T& current() const override { + return current_; + }; + + /** + * Return true in case the iterator has more elements. + */ + bool hasNext() override { + std::unique_lock<std::mutex> lock(mutex); + if (queue.size() > 0) { + return true; + } + if (finished) { + return false; + } + condition.wait(lock); + return hasNext(); + }; + + private: + std::queue<T> queue; + std::mutex mutex; + std::condition_variable condition; + + bool finished; + T current_; +}; } }
