pitrou commented on a change in pull request #9095: URL: https://github.com/apache/arrow/pull/9095#discussion_r565175198
########## File path: cpp/src/arrow/util/iterator.h ########## @@ -186,6 +187,122 @@ class Iterator : public util::EqualityComparable<Iterator<T>> { Result<T> (*next_)(void*) = NULLPTR; }; +template <typename T> +struct TransformFlow { + using YieldValueType = T; + + TransformFlow(YieldValueType value, bool ready_for_next) + : finished_(false), + ready_for_next_(ready_for_next), + yield_value_(std::move(value)) {} + TransformFlow(bool finished, bool ready_for_next) + : finished_(finished), ready_for_next_(ready_for_next), yield_value_() {} + + bool HasValue() const { return yield_value_.has_value(); } + bool Finished() const { return finished_; } + bool ReadyForNext() const { return ready_for_next_; } + T Value() const { return *yield_value_; } + + bool finished_ = false; + bool ready_for_next_ = false; + util::optional<YieldValueType> yield_value_; +}; + +struct TransformFinish { + template <typename T> + operator TransformFlow<T>() && { // NOLINT explicit + return TransformFlow<T>(true, true); + } +}; + +struct TransformSkip { + template <typename T> + operator TransformFlow<T>() && { // NOLINT explicit + return TransformFlow<T>(false, true); + } +}; + +template <typename T> +TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) { + return TransformFlow<T>(std::move(value), ready_for_next); +} + +template <typename T, typename V> +using Transformer = std::function<Result<TransformFlow<V>>(T)>; + +template <typename T, typename V> +class TransformIterator { + public: + explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer) + : it_(std::move(it)), + transformer_(std::move(transformer)), + last_value_(), + finished_() {} + + // Calls the transform function on the current value. Can return in several ways + // * If the next value is requested (e.g. skip) it will return an empty optional + // * If an invalid status is encountered that will be returned + // * If finished it will return IterationTraits<V>::End() + // * If a value is returned by the transformer that will be returned + Result<util::optional<V>> Pump() { + if (!finished_ && last_value_.has_value()) { + ARROW_ASSIGN_OR_RAISE(TransformFlow<V> next, transformer_(*last_value_)); + if (next.ReadyForNext()) { + if (*last_value_ == IterationTraits<T>::End()) { + finished_ = true; + } + last_value_.reset(); + } + if (next.Finished()) { + finished_ = true; + } + if (next.HasValue()) { + return next.Value(); + } + } + if (finished_) { + return IterationTraits<V>::End(); + } + return util::nullopt; + } + + Result<V> Next() { + while (!finished_) { + ARROW_ASSIGN_OR_RAISE(util::optional<V> next, Pump()); + if (next.has_value()) { + return *next; + } + ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next()); + } + return IterationTraits<V>::End(); + } + + private: + Iterator<T> it_; + Transformer<T, V> transformer_; + util::optional<T> last_value_; + bool finished_ = false; +}; + +/// \brief Transforms an iterator according to a transformer, returning a new Iterator. +/// +/// The transformer will be called on each element of the source iterator and for each +/// call it can yield a value, skip, or finish the iteration. When yielding a value the +/// transformer can choose to consume the source item (the default, ready_for_next = true) +/// or to keep it and it will be called again on the same value. Review comment: I would expect the decompression iterator to do its own internal buffering instead of asking for the same item several times. This is how `CompressedInputStream` already works. The decompression iterator could be factored out from that. (also see `TransformInputStream` :-)) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org