[
https://issues.apache.org/jira/browse/ARROW-16705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Victor Bonilla resolved ARROW-16705.
------------------------------------
Resolution: Fixed
> [JavaScript] TypeError: RecordBatchReader.from(...).toNodeStream is not a
> function
> ----------------------------------------------------------------------------------
>
> Key: ARROW-16705
> URL: https://issues.apache.org/jira/browse/ARROW-16705
> Project: Apache Arrow
> Issue Type: Bug
> Components: JavaScript
> Affects Versions: 8.0.0
> Environment: Nodejs v16.13.0
> Reporter: Victor Bonilla
> Priority: Major
> Labels: async, ipc, javascript, stream
>
> Trying to code a real-time stream from an async iterable of objects to an IPC
> Streaming format file I'm getting a TypeError.
> The idea is to stream every message to the arrow file as soon as it arrives
> without waiting to build the complete table to stream it. To take advantage
> of the stream event handling, I'm using the functional approach of
> [node:stream|https://nodejs.org/docs/latest-v16.x/api/stream.html] module
> (Nodejs v16.13.0).
> The async iterable contains messages that are JS objects containing different
> data types, for example:
> {code:javascript}
> {
> id: '6345',
> product: 'foo',
> price: 62.78,
> created: '2022-05-01T16:01:00.105Z',
> }{code}
> Code to replicate the error:
> {code:javascript}
> const {
> Struct, Field, Utf8, Float32, TimestampMillisecond,
> RecordBatchReader, RecordBatchStreamWriter,
> builderThroughAsyncIterable,
> } = require('apache-arrow')
> const fs = require("fs");
> const path = require("path");
> const {pipeline} = require('node:stream');
> const asyncIterable = {
> [Symbol.asyncIterator]: async function* () {
> while (true) {
> const obj = {
> id: Math.floor(Math.random() * 10).toString(),
> product: 'foo',
> price: Math.random() + Math.floor(Math.random() * 10),
> created: new Date(),
> }
> yield obj;
> // insert some asynchrony
> await new Promise((r) => setTimeout(r, 1000));
> }
> }
> }
> async function streamToArrow(messagesAsyncIterable) {
> const message_type = new Struct([
> new Field('id', new Utf8, false),
> new Field('product', new Utf8, false),
> new Field('price', new Float32, false),
> new Field('created', new TimestampMillisecond, false),
> ]);
> const builderOptions = {
> type: message_type,
> nullValues: [null, 'n/a', undefined],
> highWaterMark: 30,
> queueingStrategy: 'count',
> };
> const transform = builderThroughAsyncIterable(builderOptions);
> let file_path = './ipc_stream.arrow';
> const fsWriter = fs.createWriteStream(file_path);
> pipeline(
> RecordBatchReader
> .from(transform(messagesAsyncIterable))
> .toNodeStream(), // Throws TypeError:
> RecordBatchReader.from(...).toNodeStream is not a function
> RecordBatchStreamWriter.throughNode(),
> fsWriter,
> (err, value) => {
> if (err) {
> console.error(err);
> } else {
> console.log(value, 'value returned');
> }
> }
> ).on('close', () => {
> console.log('Closed pipeline')
> });
> }
> streamToArrow(asyncIterable){code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)