Hi,

How about using arrow::ipc::StreamDecoder instead of
arrow::ipc::StreamReader?

  class MyListener : public arrow::ipc::Listener {
  public:
    arrow::Status
    OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
    override {
      ArrowFilter arrow_filter = ArrowFilter(record_batch);
       arrow_filter.ToCsv();
    }
  }

  auto listener = std::make_shared<MyListener>();
  arrow::ipc::StreamDecoder decoder(listener);

  while (true) {
    buffer_type_t res = fut.get0();
    if (res.size() == 0) {
      break;
    }
    decoder.Consume(reinterpret_cast<const uint8_t*>(res.get()),
                    res.size());
  }


See also:

* arrow::ipc::StreamDecoder:
  
https://arrow.apache.org/docs/cpp/api/ipc.html#classarrow_1_1ipc_1_1_stream_decoder

* Event-driven reading:
  https://arrow.apache.org/docs/cpp/ipc.html#event-driven-reading

* A real world usage:
  https://github.com/groonga/groonga/blob/master/lib/arrow.cpp#L1627


Thanks,
-- 
kou

In <CAJdzkC3i=xrmjTcab4qVAcQrjBREKiRbPYeE35y7=olu-1i...@mail.gmail.com>
  "cpp arrow manage stream with incomplete records" on Sat, 2 Jul 2022 20:12:43 
+0200,
  L Ait <lhoussain.aitas...@gmail.com> wrote:

> Hi,
> I need help to integrate arrow cpp in an open source storage project. In
> fact I built cpp library and can call api.
> 
> I have a c++ code that reads data by chunks then uses some erasure code to
> rebuild original data.
> The rebuild is done in chunks , At each iteration I can access a buffer of
> rebuilt data.
> My need is to pass this data as a stream to arrow process then send the
> processed stream.
> 
> For example if my original file is a csv and I would like to filter and
> save first column:
> 
> col1,col2, col3, col3
> a1,b1,c1,d1
> an,bn,cn,dn
> 
> split to 6 chunks of equal sizes chunk1:
> 
> 
> chunk1:
> a1,b1,c1,d1
> ak,bk
> chunk2:
> 
> ck,dk
> ...
> am,bm,cm,dm
> 
> and so on.
> 
> My question is how to use the right StreamReader  in arrow and how this
> deals with incomplete records( lines)  at the beginning and end of each
> chunk ?
> 
> Here a snippet of code I use :
> buffer_type_t res = fut.get0();
> BOOST_LOG_TRIVIAL(trace) <<
> "RawxBackendReader: Got result with buffer size: " << res.size();
> std::shared_ptr<arrow::io::InputStream> input;
> 
> std::shared_ptr<arrow::io::BufferReader> buffer(new arrow::io::BufferReader(
> reinterpret_cast<const uint8_t*>(res.get()), res.size()));
> input = buffer;
> BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
> 
> ArrowFilter arrow_filter = ArrowFilter(input);
> arrow_filter.ToCsv();
> 
> 
> result.push_back(std::move(res));
> 
> Thank you

Reply via email to