marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955629720
##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>>
CollectAsyncGenerator(AsyncGenerator<T> generator) {
return Loop(LoopBody{std::move(generator), std::move(vec)});
}
+/// \brief this is just like a MapGenerator but the map fun returns a thing
instead of a
+/// future
+template <typename T, typename ApplyFn,
+ typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+ typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn
apply_fun,
+ internal::Executor* cpu_exec) {
+ struct State {
+ explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+ internal::Executor* cpu_exec_)
+ : source_gen(std::move(source_gen_)),
+ apply_fun(std::move(apply_fun_)),
+ cpu_exec(cpu_exec_),
+ finished(false) {}
+
+ AsyncGenerator<T> source_gen;
+ ApplyFn apply_fun;
+ internal::Executor* cpu_exec;
+ bool finished;
+ };
+
+ auto state =
+ std::make_shared<State>(std::move(source_gen), std::move(apply_fun),
cpu_exec);
+ return [state]() {
+ CallbackOptions options;
+ options.executor = state->cpu_exec;
+ options.should_schedule = ShouldSchedule::Always;
+
+ return state->source_gen().Then(
+ [state](const T& next) -> Result<V> {
+ if (IsIterationEnd(next)) {
+ return IterationTraits<V>::End();
+ } else {
+ auto value = state->apply_fun(next);
+ if (!value.ok()) {
+ return Status::NotImplemented("not implemented");
+ } else {
+ return value.ValueOrDie();
+ }
Review Comment:
As opposed to just finishing the generator? It needs to return a not ok
status here, the error message should be fleshed out more. If it simply
finishes instead of returning a not ok, some of the tests will fail. e.g. if
you read invalid csv, the parse task would return a not ok status that needs to
be reflected up this generator to result in a status not ok for dataset inspect
or the like.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]