http://git-wip-us.apache.org/repos/asf/marmotta/blob/2e32da5d/libraries/ostrich/backend/test/main.cc
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/test/main.cc 
b/libraries/ostrich/backend/test/main.cc
index 40755a6..5e6a52b 100644
--- a/libraries/ostrich/backend/test/main.cc
+++ b/libraries/ostrich/backend/test/main.cc
@@ -1,7 +1,7 @@
 // Copyright 2015 Google Inc. All Rights Reserved.
 // Author: Sebastian Schaffert <[email protected]>
 #include <glog/logging.h>
-#include "gtest.h"
+#include "gtest/gtest.h"
 
 // run all tests in the current binary
 int main(int argc, char **argv) {

http://git-wip-us.apache.org/repos/asf/marmotta/blob/2e32da5d/libraries/ostrich/backend/util/iterator.h
----------------------------------------------------------------------
diff --git a/libraries/ostrich/backend/util/iterator.h 
b/libraries/ostrich/backend/util/iterator.h
index 002c24f..0d34a4b 100644
--- a/libraries/ostrich/backend/util/iterator.h
+++ b/libraries/ostrich/backend/util/iterator.h
@@ -23,6 +23,10 @@
 #ifndef MARMOTTA_ITERATOR_H
 #define MARMOTTA_ITERATOR_H
 
+#include <queue>
+#include <mutex>
+#include <condition_variable>
+
 namespace marmotta {
 namespace util {
 
@@ -39,19 +43,14 @@ class CloseableIterator {
     virtual ~CloseableIterator() {}
 
     /**
-     * Increment iterator to next element.
+     * Increment iterator to next element and return a reference to this 
element.
     */
-    virtual CloseableIterator<T>& operator++() = 0;
-
-    /**
-     * Dereference iterator, returning a reference to the current element.
-     */
-    virtual T& operator*() = 0;
+    virtual const T& next() = 0;
 
     /**
-     * Dereference iterator, returning a pointer to the current element.
+     * Return a reference to the current element.
      */
-    virtual T* operator->() = 0;
+    virtual const T& current() const = 0;
 
     /**
      * Return true in case the iterator has more elements.
@@ -68,15 +67,11 @@ class EmptyIterator : public CloseableIterator<T> {
  public:
     EmptyIterator() { }
 
-    CloseableIterator<T> &operator++() override {
-        return *this;
-    }
-
-    T &operator*() override {
+    const T &next() override {
         throw std::out_of_range("No more elements");
     };
 
-    T *operator->() override {
+    const T &current() const override {
         throw std::out_of_range("No more elements");
     };
 
@@ -95,23 +90,21 @@ class SingletonIterator : public CloseableIterator<T> {
  public:
     SingletonIterator(T& value) : value(value), incremented(false) { }
 
-    CloseableIterator<T> &operator++() override {
-        incremented = true;
-        return *this;
-    };
-
-    T &operator*() override {
-        if (!incremented)
+    const T &next() override {
+        if (!incremented) {
+            incremented = true;
             return value;
-        else
+        } else {
             throw std::out_of_range("No more elements");
+        }
     };
 
-    T *operator->() override {
-        if (!incremented)
-            return &value;
-        else
+    const T &current() const override {
+        if (!incremented) {
+            return value;
+        } else {
             throw std::out_of_range("No more elements");
+        }
     };
 
     bool hasNext() override {
@@ -124,7 +117,37 @@ class SingletonIterator : public CloseableIterator<T> {
 
 };
 
+/**
+ * An iterator wrapping a standard STL collection iterator.
+ */
+template<typename T>
+class CollectionIterator : public CloseableIterator<T> {
+ public:
+    CollectionIterator(std::vector<T> values)
+            : values(values), index(0) {
+    }
 
+    const T& next() override {
+        index++;
+        return values[index-1];
+    };
+
+    const T& current() const override {
+        return values[index-1];
+    };
+
+    bool hasNext() override {
+        return index < values.size();
+    };
+
+ private:
+    std::vector<T> values;
+    int index;
+};
+
+/**
+ * An iterator implementation supporting to pass a predicate for filtering 
values.
+ */
 template<typename T>
 class FilteringIterator : public CloseableIterator<T> {
  public:
@@ -137,31 +160,20 @@ class FilteringIterator : public CloseableIterator<T> {
      */
     FilteringIterator(CloseableIterator<T>* it,  PredicateFn pred)
             : it(it), pred(pred), nextExists(false) {
-        // run findNext twice so both current and next are set
-        findNext();
+        // run findNext once so next is set
         findNext();
     }
 
     /**
      * Increment iterator to next element.
-    */
-    CloseableIterator<T>& operator++() override {
-        findNext();
-        return *this;
-    };
-
-    /**
-     * Dereference iterator, returning a reference to the current element.
      */
-    T& operator*() override {
-        return current;
+    const T& next() override {
+        findNext();
+        return current_;
     };
 
-    /**
-     * Dereference iterator, returning a pointer to the current element.
-     */
-    T* operator->() override {
-        return &current;
+    const T& current() const override {
+        return current_;
     };
 
     /**
@@ -176,18 +188,18 @@ class FilteringIterator : public CloseableIterator<T> {
     std::unique_ptr<CloseableIterator<T>> it;
     PredicateFn pred;
 
-    T current;
-    T next;
+    T current_;
+    T next_;
 
     bool nextExists;
 
     void findNext() {
-        current = next;
+        current_ = next_;
         nextExists = false;
 
         while (it->hasNext()) {
-            if (pred(**it)) {
-                next = **it;
+            next_ = it->next();
+            if (pred(next_)) {
                 nextExists = true;
                 break;
             }
@@ -195,6 +207,69 @@ class FilteringIterator : public CloseableIterator<T> {
     }
 };
 
+/**
+ * A multi-threaded iterator supporting iteration over the results
+ * successively added by a producer. Blocks while the internal queue
+ * is empty and the producer is not yet finished reporting. The
+ * producer has to explicitly call finish() when there are no more
+ * elements to report.
+ */
+template<typename T>
+class ProducerConsumerIterator : public CloseableIterator<T> {
+ public:
+    ProducerConsumerIterator() {}
+
+    void add(const T& value) {
+        std::unique_lock<std::mutex> lock(mutex);
+        queue.push(value);
+        condition.notify_one();
+    }
+
+    void finish() {
+        std::unique_lock<std::mutex> lock(mutex);
+        finished = true;
+        condition.notify_all();
+    }
+
+    /**
+     * Increment iterator to next element.
+     */
+    const T& next() override {
+        if (hasNext()) {
+            std::unique_lock<std::mutex> lock(mutex);
+            current_ = queue.front();
+            queue.pop();
+        }
+        return current_;
+    };
+
+    const T& current() const override {
+        return current_;
+    };
+
+    /**
+     * Return true in case the iterator has more elements.
+     */
+    bool hasNext() override {
+        std::unique_lock<std::mutex> lock(mutex);
+        if (queue.size() > 0) {
+            return true;
+        }
+        if (finished) {
+            return false;
+        }
+        condition.wait(lock);
+        return hasNext();
+    };
+
+ private:
+    std::queue<T> queue;
+    std::mutex mutex;
+    std::condition_variable condition;
+
+    bool finished;
+    T current_;
+};
 
 }
 }

Reply via email to