Hi,
How about using arrow::ipc::StreamDecoder instead of
arrow::ipc::StreamReader?
class MyListener : public arrow::ipc::Listener {
public:
arrow::Status
OnRecordBatchDecoded(std::shared_ptr<arrow::RecordBatch> record_batch)
override {
ArrowFilter arrow_filter = ArrowFilter(record_batch);
arrow_filter.ToCsv();
}
}
auto listener = std::make_shared<MyListener>();
arrow::ipc::StreamDecoder decoder(listener);
while (true) {
buffer_type_t res = fut.get0();
if (res.size() == 0) {
break;
}
decoder.Consume(reinterpret_cast<const uint8_t*>(res.get()),
res.size());
}
See also:
* arrow::ipc::StreamDecoder:
https://arrow.apache.org/docs/cpp/api/ipc.html#classarrow_1_1ipc_1_1_stream_decoder
* Event-driven reading:
https://arrow.apache.org/docs/cpp/ipc.html#event-driven-reading
* A real world usage:
https://github.com/groonga/groonga/blob/master/lib/arrow.cpp#L1627
Thanks,
--
kou
In <CAJdzkC3i=xrmjTcab4qVAcQrjBREKiRbPYeE35y7=olu-1i...@mail.gmail.com>
"cpp arrow manage stream with incomplete records" on Sat, 2 Jul 2022 20:12:43
+0200,
L Ait <[email protected]> wrote:
> Hi,
> I need help to integrate arrow cpp in an open source storage project. In
> fact I built cpp library and can call api.
>
> I have a c++ code that reads data by chunks then uses some erasure code to
> rebuild original data.
> The rebuild is done in chunks , At each iteration I can access a buffer of
> rebuilt data.
> My need is to pass this data as a stream to arrow process then send the
> processed stream.
>
> For example if my original file is a csv and I would like to filter and
> save first column:
>
> col1,col2, col3, col3
> a1,b1,c1,d1
> an,bn,cn,dn
>
> split to 6 chunks of equal sizes chunk1:
>
>
> chunk1:
> a1,b1,c1,d1
> ak,bk
> chunk2:
>
> ck,dk
> ...
> am,bm,cm,dm
>
> and so on.
>
> My question is how to use the right StreamReader in arrow and how this
> deals with incomplete records( lines) at the beginning and end of each
> chunk ?
>
> Here a snippet of code I use :
> buffer_type_t res = fut.get0();
> BOOST_LOG_TRIVIAL(trace) <<
> "RawxBackendReader: Got result with buffer size: " << res.size();
> std::shared_ptr<arrow::io::InputStream> input;
>
> std::shared_ptr<arrow::io::BufferReader> buffer(new arrow::io::BufferReader(
> reinterpret_cast<const uint8_t*>(res.get()), res.size()));
> input = buffer;
> BOOST_LOG_TRIVIAL(trace) << "laa type input" << input.get();
>
> ArrowFilter arrow_filter = ArrowFilter(input);
> arrow_filter.ToCsv();
>
>
> result.push_back(std::move(res));
>
> Thank you