jrecuerda commented on issue #11799:
URL: https://github.com/apache/arrow/issues/11799#issuecomment-1017533866


   Thank you very much for the detailed response. I managed to perform the 
operation over a table by using the following snippet:
   ```
   void compute_histogram(const arrow::Table &table, int bin_size, int max)
   {
       auto plan = cp::ExecPlan::Make().ValueOrDie();
       auto reader = std::make_shared<arrow::TableBatchReader>(table);
   
       // auto context = arrow::io::IOContext();
       // auto batch_gen = cp::MakeReaderGenerator(std::move(reader), 
context.executor()).ValueOrDie();
       auto thread_pool = arrow::internal::GetCpuThreadPool();
       auto batch_gen = cp::MakeReaderGenerator(std::move(reader), 
thread_pool).ValueOrDie();
   
       cp::CountOptions options(cp::CountOptions::ONLY_VALID);
       arrow::AsyncGenerator<util::optional<cp::ExecBatch>> sink_gen;
       cp::Declaration::Sequence({
                                     {"source", 
cp::SourceNodeOptions{table.schema(), batch_gen}},
                                     {"project",
                                      
cp::ProjectNodeOptions{{cp::field_ref("duration_histogram.duration"),
                                                              cp::call("cast",
                                                                       
{cp::call("divide",
                                                                                
 {cp::field_ref("duration_histogram.duration"),
                                                                                
  cp::literal(bin_size)})},
                                                                       
std::make_shared<cp::CastOptions>(cp::CastOptions::Unsafe(arrow::int64())))},
                                                             {"duration", 
"duration_bin"}}},
   
                                     {"aggregate",
                                      cp::AggregateNodeOptions{
                                          /*aggregates=*/{{"hash_count", 
&options}},
                                          /*targets=*/{"duration"},
                                          /*names=*/{"result"},
                                          /*keys=*/{"duration_bin"}}},
                                     {"sink", cp::SinkNodeOptions{&sink_gen}},
                                 })
           .AddToPlan(plan.get())
           .ValueOrDie();
   
       if (auto status = plan->Validate(); !status.ok())
       {
           throw std::runtime_error("Plan validation error: " + 
status.ToString());
       }
   
       if (auto status = plan->StartProducing(); !status.ok())
       {
           throw std::runtime_error("Production error: " + status.ToString());
       }
   
       auto outputSchema = arrow::schema({arrow::field("duration", 
arrow::float64()),
                                          arrow::field("duration_bin", 
arrow::int64())});
   
       auto collected_fut = arrow::CollectAsyncGenerator(sink_gen);
       auto res = arrow::AllComplete({plan->finished(), 
arrow::Future<>(collected_fut)})
                      .Then([collected_fut, outputSchema]() -> 
arrow::Result<std::vector<std::shared_ptr<arrow::RecordBatch>>>
                            {
           auto collected = collected_fut.result().ValueOrDie();
           // std::vector<cp::ExecBatch> out;
           std::vector<std::shared_ptr<arrow::RecordBatch>> out;
           out.reserve(collected.size());
           std::transform(std::make_move_iterator(collected.begin()),
                          std::make_move_iterator(collected.end()), 
std::back_inserter(out),
                          [outputSchema](util::optional<cp::ExecBatch> batch)
                       //    { return std::move(*batch); });
                          { return 
batch->ToRecordBatch(outputSchema).ValueOrDie(); });
           return out; });
   
       auto outputTable = arrow::Table::FromRecordBatches(outputSchema, 
res.result().ValueOrDie()).ValueOrDie();
       std::cout << outputTable->ToString() << "\n";
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to