trxcllnt commented on issue #41683:
URL: https://github.com/apache/arrow/issues/41683#issuecomment-2148082482
@vivek1729 As for asynchronously reading batches from a stream, you have the
right idea. I recommend either using AsyncIterables (the `for await(const batch
of reader) { ... }` syntax) or the node/WhatWG streaming APIs.
I have an old codepen example of different strategies
[here](https://codepen.io/trxcllnt/pen/vYYbage?editors=0012). Lines 29-39 are
most relevant to your situation. I put this together to show a friend how to
wrap the Arrow IPC format in a custom binary protocol, which is why it includes
some intermediate transform steps in between the Arrow IPC writer and reader.
Here's a shorter example:
```js
import * as Arrow from 'apache-arrow';
for await (const batch of Arrow.RecordBatchReader.from(await
fetch('/get-a-table'))) {
console.log(batch.toArray());
}
```
One neat thing the JS implementation supports is reading/writing multiple
tables on the same stream. This can be handy if you need to send data under
latency-sensitive conditions, where opening/closing the underlying transport
incurs unnecessary overhead (think HTTP response streams):
```ts
import * as Arrow from 'apache-arrow';
let tableCount = 0;
for await (const reader of Arrow.RecordBatchReader.readAll(await
fetch('/get-many-tables'))) {
switch(tableCount++) {
case 0: doThingWithTheFirstTable(reader); break;
case 1: doThingWithTheSecondTable(reader); break;
// ...
}
}
function doThingWithTheFirstTable(
reader: Arrow.AsyncRecordBatchStreamReader<{ strs: Arrow.Utf8 }>
) { /*...*/ }
function doThingWithTheSecondTable(
reader: Arrow.AsyncRecordBatchStreamReader<{ ints: Arrow.Int32 }>
) { /*...*/ }
```
And this is what that might look like on the server:
```js
import * as https from 'node:https';
import * as Arrow from 'apache-arrow';
https.createServer(options, (req, res) => {
res.writeHead(200);
// set autoDestroy: false so the underlying response isn't closed after
sending the first table
// initially call `.reset(res)` so the writer has a sink and immediately
flushes each batch
const writer = new Arrow.RecordBatchStreamWriter({ autoDestroy: false
}).reset(res);
// alternatively, you can write each table as individual batches if
producing asynchronously
// sleep then write the first table
await new Promise((r) => setTimeout(r, 1000));
writer.write(new Arrow.Table({
strs: Arrow.vectorFromArray(['a', 'b', 'c'], new Arrow.Utf8)
});
// sleep then write the second table
await new Promise((r) => setTimeout(r, 1000));
writer.write(new Arrow.Table({
ints: Arrow.vectorFromArray([0, 1, 2, 3, 4, 5], new Arrow.Int32)
});
// must explicitly close when done (this will call `res.end()` on the
response)
writer.close();
}).listen(8000);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]