Hi, How about using arrow::ipc::StreamDecoder instead of arrow::ipc::StreamReader?
class MyListener : public arrow::ipc::Listener { public: arrow::Status OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch) override { ArrowFilter arrow_filter = ArrowFilter(record_batch); arrow_filter.ToCsv(); } } auto listener = std::make_shared<MyListener>(); arrow::ipc::StreamDecoder decoder(listener); while (true) { buffer_type_t res = fut.get0(); if (res.size() == 0) { break; } decoder.Consume(reinterpret_cast<const uint8_t*>(res.get()), res.size()); } See also: * arrow::ipc::StreamDecoder: https://arrow.apache.org/docs/cpp/api/ipc.html#classarrow_1_1ipc_1_1_stream_decoder * Event-driven reading: https://arrow.apache.org/docs/cpp/ipc.html#event-driven-reading * A real world usage: https://github.com/groonga/groonga/blob/master/lib/arrow.cpp#L1627 Thanks, -- kou In <CAJdzkC3i=xrmjTcab4qVAcQrjBREKiRbPYeE35y7=olu-1i...@mail.gmail.com> "cpp arrow manage stream with incomplete records" on Sat, 2 Jul 2022 20:12:43 +0200, L Ait <lhoussain.aitas...@gmail.com> wrote: > Hi, > I need help to integrate arrow cpp in an open source storage project. In > fact I built cpp library and can call api. > > I have a c++ code that reads data by chunks then uses some erasure code to > rebuild original data. > The rebuild is done in chunks , At each iteration I can access a buffer of > rebuilt data. > My need is to pass this data as a stream to arrow process then send the > processed stream. > > For example if my original file is a csv and I would like to filter and > save first column: > > col1,col2, col3, col3 > a1,b1,c1,d1 > an,bn,cn,dn > > split to 6 chunks of equal sizes chunk1: > > > chunk1: > a1,b1,c1,d1 > ak,bk > chunk2: > > ck,dk > ... > am,bm,cm,dm > > and so on. > > My question is how to use the right StreamReader in arrow and how this > deals with incomplete records( lines) at the beginning and end of each > chunk ? > > Here a snippet of code I use : > buffer_type_t res = fut.get0(); > BOOST_LOG_TRIVIAL(trace) << > "RawxBackendReader: Got result with buffer size: " << res.size(); > std::shared_ptr<arrow::io::InputStream> input; > > std::shared_ptr<arrow::io::BufferReader> buffer(new arrow::io::BufferReader( > reinterpret_cast<const uint8_t*>(res.get()), res.size())); > input = buffer; > BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get(); > > ArrowFilter arrow_filter = ArrowFilter(input); > arrow_filter.ToCsv(); > > > result.push_back(std::move(res)); > > Thank you