This is an automated email from the ASF dual-hosted git repository.
domoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5611f2bd0d ARROW-16704: [JS] Handle case where `tableFromIPC` input is
an async `RecordBatchReader` (#13278)
5611f2bd0d is described below
commit 5611f2bd0d6136b005d137a84b50709fc5c813bb
Author: Paul Taylor <[email protected]>
AuthorDate: Wed Jun 1 11:32:46 2022 -0700
ARROW-16704: [JS] Handle case where `tableFromIPC` input is an async
`RecordBatchReader` (#13278)
Authored-by: ptaylor <[email protected]>
Signed-off-by: Dominik Moritz <[email protected]>
---
js/src/ipc/reader.ts | 2 +-
js/src/ipc/serialization.ts | 26 ++++++++++++++++++++------
js/test/unit/ipc/serialization-tests.ts | 32 ++++++++++++++++++++++++++++----
3 files changed, 49 insertions(+), 11 deletions(-)
diff --git a/js/src/ipc/reader.ts b/js/src/ipc/reader.ts
index 5b949322d9..77496e799b 100644
--- a/js/src/ipc/reader.ts
+++ b/js/src/ipc/reader.ts
@@ -147,7 +147,7 @@ export class RecordBatchReader<T extends TypeMap = any>
extends ReadableInterop<
public static from<T extends TypeMap = any>(source: FromArg1):
Promise<RecordBatchStreamReader<T>>;
public static from<T extends TypeMap = any>(source: FromArg2):
RecordBatchFileReader<T> | RecordBatchStreamReader<T>;
public static from<T extends TypeMap = any>(source: FromArg3):
Promise<RecordBatchFileReader<T> | RecordBatchStreamReader<T>>;
- public static from<T extends TypeMap = any>(source: FromArg4):
Promise<RecordBatchFileReader<T> | AsyncRecordBatchReaders<T>>;
+ public static from<T extends TypeMap = any>(source: FromArg4):
Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
public static from<T extends TypeMap = any>(source: FromArg5):
Promise<AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>>;
/** @nocollapse */
public static from<T extends TypeMap = any>(source: any) {
diff --git a/js/src/ipc/serialization.ts b/js/src/ipc/serialization.ts
index 680babd7b2..aee4676213 100644
--- a/js/src/ipc/serialization.ts
+++ b/js/src/ipc/serialization.ts
@@ -18,22 +18,36 @@
import { Table } from '../table.js';
import { TypeMap } from '../type.js';
import { isPromise } from '../util/compat.js';
-import { FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5,
RecordBatchReader } from './reader.js';
+import {
+ FromArg0, FromArg1, FromArg2, FromArg3, FromArg4, FromArg5,
+ RecordBatchReader,
+ RecordBatchFileReader, RecordBatchStreamReader,
+ AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader
+} from './reader.js';
import { RecordBatchFileWriter, RecordBatchStreamWriter } from './writer.js';
+type RecordBatchReaders<T extends TypeMap = any> = RecordBatchFileReader<T> |
RecordBatchStreamReader<T>;
+type AsyncRecordBatchReaders<T extends TypeMap = any> =
AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>;
+
/**
* Deserialize the IPC format into a {@link Table}. This function is a
* convenience wrapper for {@link RecordBatchReader}. Opposite of {@link
tableToIPC}.
*/
export function tableFromIPC<T extends TypeMap = any>(source: FromArg0 |
FromArg2): Table<T>;
export function tableFromIPC<T extends TypeMap = any>(source: FromArg1):
Promise<Table<T>>;
-export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 |
FromArg4 | FromArg5): Promise<Table<T>> | Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source: FromArg3 |
FromArg4 | FromArg5): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source:
RecordBatchReaders<T>): Table<T>;
+export function tableFromIPC<T extends TypeMap = any>(source:
AsyncRecordBatchReaders<T>): Promise<Table<T>>;
+export function tableFromIPC<T extends TypeMap = any>(source:
RecordBatchReader<T>): Table<T> | Promise<Table<T>>;
export function tableFromIPC<T extends TypeMap = any>(input: any): Table<T> |
Promise<Table<T>> {
- const reader = RecordBatchReader.from<T>(input);
- if (isPromise(reader)) {
- return (async () => new Table(await (await reader).readAll()))();
+ const reader = RecordBatchReader.from<T>(input) as RecordBatchReader<T> |
Promise<RecordBatchReader<T>>;
+ if (isPromise<RecordBatchReader<T>>(reader)) {
+ return reader.then((reader) => tableFromIPC(reader)) as
Promise<Table<T>>;
+ }
+ if (reader.isAsync()) {
+ return (reader as AsyncRecordBatchReaders<T>).readAll().then((xs) =>
new Table(xs));
}
- return new Table(reader.readAll());
+ return new Table((reader as RecordBatchReaders<T>).readAll());
}
/**
diff --git a/js/test/unit/ipc/serialization-tests.ts
b/js/test/unit/ipc/serialization-tests.ts
index 375b5159e6..f31a242096 100644
--- a/js/test/unit/ipc/serialization-tests.ts
+++ b/js/test/unit/ipc/serialization-tests.ts
@@ -19,9 +19,9 @@ import '../../jest-extensions.js';
import * as generate from '../../generate-test-data.js';
import {
- Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32, Utf8,
Null,
+ Table, Schema, Field, DataType, TypeMap, Dictionary, Int32, Float32,
Uint8, Utf8, Null,
makeVector,
- tableFromIPC, tableToIPC
+ tableFromIPC, tableToIPC, RecordBatchReader, RecordBatchStreamWriter
} from 'apache-arrow';
const deepCopy = (t: Table) => tableFromIPC(tableToIPC(t));
@@ -37,6 +37,30 @@ function createTable<T extends TypeMap = any>(schema:
Schema<T>, chunkLengths: n
return generate.table(chunkLengths, schema).table;
}
+describe('tableFromIPC', () => {
+ test('handles AsyncRecordBatchReader input', async () => {
+ type T = { a: Uint8 };
+
+ const sources = [
+ new Table({ a: makeVector(new Uint8Array([1, 2, 3])) }),
+ new Table({ a: makeVector(new Uint8Array([4, 5, 6])) }),
+ ];
+
+ const writer = sources.reduce(
+ (writer, source) => writer.writeAll(source),
+ new RecordBatchStreamWriter<T>({ autoDestroy: false })
+ );
+
+ writer.close();
+
+ let index = 0;
+
+ for await (const reader of
RecordBatchReader.readAll<T>(writer[Symbol.asyncIterator]())) {
+ expect(await tableFromIPC(reader)).toEqualTable(sources[index++]);
+ }
+ });
+});
+
describe('tableToIPC()', () => {
test(`to file format`, () => {
@@ -45,7 +69,7 @@ describe('tableToIPC()', () => {
});
const buffer = tableToIPC(source, 'file');
const result = tableFromIPC(buffer);
- expect(source).toEqualTable(result);
+ expect(result).toEqualTable(source);
});
test(`to stream format`, () => {
@@ -54,7 +78,7 @@ describe('tableToIPC()', () => {
});
const buffer = tableToIPC(source, 'stream');
const result = tableFromIPC(buffer);
- expect(source).toEqualTable(result);
+ expect(result).toEqualTable(source);
});
test(`doesn't swap the order of buffers that share the same underlying
ArrayBuffer but are in a different order`, () => {