pitrou commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r565175198



##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +187,122 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_ = false;
+  bool ready_for_next_ = false;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<Result<TransformFlow<V>>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  // Calls the transform function on the current value.  Can return in several 
ways
+  // * If the next value is requested (e.g. skip) it will return an empty 
optional
+  // * If an invalid status is encountered that will be returned
+  // * If finished it will return IterationTraits<V>::End()
+  // * If a value is returned by the transformer that will be returned
+  Result<util::optional<V>> Pump() {
+    if (!finished_ && last_value_.has_value()) {
+      ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_));
+      if (next.ReadyForNext()) {
+        if (*last_value_ == IterationTraits<T>::End()) {
+          finished_ = true;
+        }
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::nullopt;
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      ARROW_ASSIGN_OR_RAISE(util::optional<V> next, Pump());
+      if (next.has_value()) {
+        return *next;
+      }
+      ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next());
+    }
+    return IterationTraits<V>::End();
+  }
+
+ private:
+  Iterator<T> it_;
+  Transformer<T, V> transformer_;
+  util::optional<T> last_value_;
+  bool finished_ = false;
+};
+
+/// \brief Transforms an iterator according to a transformer, returning a new 
Iterator.
+///
+/// The transformer will be called on each element of the source iterator and 
for each
+/// call it can yield a value, skip, or finish the iteration.  When yielding a 
value the
+/// transformer can choose to consume the source item (the default, 
ready_for_next = true)
+/// or to keep it and it will be called again on the same value.

Review comment:
       I would expect the decompression iterator to do its own internal 
buffering instead of asking for the same item several times. This is how 
`CompressedInputStream` already works. The decompression iterator could be 
factored out from that.
   
   (also see `TransformInputStream` :-))




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to