trxcllnt commented on a change in pull request #10371: URL: https://github.com/apache/arrow/pull/10371#discussion_r637038934
########## File path: js/src/table.ts ########## @@ -16,280 +16,226 @@ // under the License. import { Data } from './data'; -import { Column } from './column'; -import { Schema, Field } from './schema'; +import { Type } from './enum'; +import { Vector } from './vector'; +import { Schema } from './schema'; +import { DataType, Struct } from './type'; +import { compareSchemas } from './visitor/typecomparator'; + +import { + ChunkedIterator, + isChunkedValid, + computeChunkOffsets, + computeChunkNullCounts, + wrapChunkedGet, + wrapChunkedCall1, + wrapChunkedCall2, + wrapChunkedSet, + wrapChunkedIndexOf, +} from './util/chunk'; + +import { IndexingProxyHandlerMixin } from './util/proxy'; + +import { instance as getVisitor } from './visitor/get'; +import { instance as setVisitor } from './visitor/set'; +import { instance as indexOfVisitor } from './visitor/indexof'; +import { instance as toArrayVisitor } from './visitor/toarray'; +import { instance as byteLengthVisitor } from './visitor/bytelength'; + import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from './recordbatch'; -import { DataFrame } from './compute/dataframe'; -import { RecordBatchReader } from './ipc/reader'; -import { DataType, RowLike, Struct } from './type'; -import { selectColumnArgs, selectArgs } from './util/args'; -import { Clonable, Sliceable, Applicative } from './vector'; -import { isPromise, isIterable, isAsyncIterable } from './util/compat'; -import { RecordBatchFileWriter, RecordBatchStreamWriter } from './ipc/writer'; -import { distributeColumnsIntoRecordBatches, distributeVectorsIntoRecordBatches } from './util/recordbatch'; -import { Vector, Chunked, StructVector, VectorBuilderOptions, VectorBuilderOptionsAsync } from './vector/index'; -import { TypedArray, TypedArrayDataType } from './interfaces'; - -type VectorMap = { [key: string]: Vector | Exclude<TypedArray, Uint8ClampedArray> }; -type Fields<T extends { [key: string]: DataType }> = (keyof T)[] | Field<T[keyof T]>[]; -type ChildData<T extends { [key: string]: DataType }> = Data<T[keyof T]>[] | Vector<T[keyof T]>[]; -type Columns<T extends { [key: string]: DataType }> = Column<T[keyof T]>[] | Column<T[keyof T]>[][]; +/** @ignore */ export interface Table<T extends { [key: string]: DataType } = any> { + /// + // Virtual properties for the TypeScript compiler. + // These do not exist at runtime. + /// + readonly TType: Struct<T>; + readonly TArray: Struct<T>['TArray']; + readonly TValue: Struct<T>['TValue']; - get(index: number): Struct<T>['TValue']; - [Symbol.iterator](): IterableIterator<RowLike<T>>; + /** + * @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/isConcatSpreadable + */ + [Symbol.isConcatSpreadable]: true; +} - slice(begin?: number, end?: number): Table<T>; - concat(...others: Vector<Struct<T>>[]): Table<T>; - clone(chunks?: RecordBatch<T>[], offsets?: Uint32Array): Table<T>; +export class Table<T extends { [key: string]: DataType } = any> { - scan(next: import('./compute/dataframe').NextFunc, bind?: import('./compute/dataframe').BindFunc): void; - scanReverse(next: import('./compute/dataframe').NextFunc, bind?: import('./compute/dataframe').BindFunc): void; - countBy(name: import('./compute/predicate').Col | string): import('./compute/dataframe').CountByResult; - filter(predicate: import('./compute/predicate').Predicate): import('./compute/dataframe').FilteredDataFrame<T>; -} + constructor(columns: { [P in keyof T]: Vector<T[P]> }); + constructor(schema: Schema<T>, data?: RecordBatch<T> | RecordBatch<T>[]); + constructor(schema: Schema<T>, data?: RecordBatch<T> | RecordBatch<T>[], offsets?: Uint32Array); + constructor(...args: any[]) { -export class Table<T extends { [key: string]: DataType } = any> - extends Chunked<Struct<T>> - implements DataFrame<T>, - Clonable<Table<T>>, - Sliceable<Table<T>>, - Applicative<Struct<T>, Table<T>> { - - /** @nocollapse */ - public static empty<T extends { [key: string]: DataType } = Record<string, never>>(schema = new Schema<T>([])) { return new Table<T>(schema, []); } - - public static from(): Table<Record<string, never>>; - public static from<T extends { [key: string]: DataType } = any>(source: RecordBatchReader<T>): Table<T>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg0): Table<T>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg2): Table<T>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg1): Promise<Table<T>>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg3): Promise<Table<T>>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg4): Promise<Table<T>>; - public static from<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArg5): Promise<Table<T>>; - public static from<T extends { [key: string]: DataType } = any>(source: PromiseLike<RecordBatchReader<T>>): Promise<Table<T>>; - public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptions<Struct<T>, TNull>): Table<T>; - public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptionsAsync<Struct<T>, TNull>): Promise<Table<T>>; - /** @nocollapse */ - public static from<T extends { [key: string]: DataType } = any, TNull = any>(input?: any) { - - if (!input) { return Table.empty(); } - - if (typeof input === 'object') { - const table = isIterable(input['values']) ? tableFromIterable<T, TNull>(input) - : isAsyncIterable(input['values']) ? tableFromAsyncIterable<T, TNull>(input) - : null; - if (table !== null) { return table; } + if (args.length === 0) { + args = [new Schema([])]; } - let reader = RecordBatchReader.from<T>(input) as RecordBatchReader<T> | Promise<RecordBatchReader<T>>; - - if (isPromise<RecordBatchReader<T>>(reader)) { - return (async () => await Table.from(await reader))(); + if (args.length === 1 && !(args[0] instanceof Schema)) { + const [obj] = args as [{ [P in keyof T]: Vector<T[P]> }]; + const batches = Object.keys(obj).reduce((batches, name: keyof T) => { + obj[name].data.forEach((data, row) => { + (batches[row] || ( + batches[row] = {} as { [P in keyof T]: Data<T[P]> }) + )[name] = data; + }); + return batches; + }, new Array<{ [P in keyof T]: Data<T[P]> }>()) + .map((data) => new RecordBatch<T>(data)); + + args = [batches[0].schema, batches]; } - if (reader.isSync() && (reader = reader.open())) { - return !reader.schema ? Table.empty() : new Table<T>(reader.schema, [...reader]); + + let [schema, data, offsets] = args; + + if (!(schema instanceof Schema)) { + throw new TypeError('Table constructor expects a [Schema, RecordBatch[]] pair.'); } - return (async (opening) => { - const reader = await opening; - const schema = reader.schema; - const batches: RecordBatch[] = []; - if (schema) { - for await (const batch of reader) { - batches.push(batch); - } - return new Table<T>(schema, batches); + + this.schema = schema; + + [, data = [new _InternalEmptyPlaceholderRecordBatch(schema)]] = args; + + const batches: RecordBatch<T>[] = Array.isArray(data) ? data : [data]; + + batches.forEach((batch: RecordBatch<T>) => { + if (!(batch instanceof RecordBatch)) { + throw new TypeError('Table constructor expects a [Schema, RecordBatch[]] pair.'); } - return Table.empty(); - })(reader.open()); - } + if (!compareSchemas(this.schema, batch.schema)) { + throw new TypeError('Table and all RecordBatch schemas must be equivalent.'); + } + }, new Struct(schema.fields)); - /** @nocollapse */ - public static async fromAsync<T extends { [key: string]: DataType } = any>(source: import('./ipc/reader').FromArgs): Promise<Table<T>> { - return await Table.from<T>(source as any); + this.data = batches.map(({ data }) => data); + this._offsets = offsets ?? computeChunkOffsets(this.data); } - /** @nocollapse */ - public static fromStruct<T extends { [key: string]: DataType } = any>(vector: Vector<Struct<T>>) { - return Table.new<T>(vector.data.childData as Data<T[keyof T]>[], vector.type.children); - } + protected _offsets!: Uint32Array; + protected _nullCount!: number; + protected _children?: Vector[]; /** - * @summary Create a new Table from a collection of Columns or Vectors, - * with an optional list of names or Fields. - * - * - * `Table.new` accepts an Object of - * Columns or Vectors, where the keys will be used as the field names - * for the Schema: - * ```ts - * const i32s = Int32Vector.from([1, 2, 3]); - * const f32s = Float32Vector.from([.1, .2, .3]); - * const table = Table.new({ i32: i32s, f32: f32s }); - * assert(table.schema.fields[0].name === 'i32'); - * ``` - * - * It also accepts a a list of Vectors with an optional list of names or - * Fields for the resulting Schema. If the list is omitted or a name is - * missing, the numeric index of each Vector will be used as the name: - * ```ts - * const i32s = Int32Vector.from([1, 2, 3]); - * const f32s = Float32Vector.from([.1, .2, .3]); - * const table = Table.new([i32s, f32s], ['i32']); - * assert(table.schema.fields[0].name === 'i32'); - * assert(table.schema.fields[1].name === '1'); - * ``` - * - * If the supplied arguments are Columns, `Table.new` will infer the Schema - * from the Columns: - * ```ts - * const i32s = Column.new('i32', Int32Vector.from([1, 2, 3])); - * const f32s = Column.new('f32', Float32Vector.from([.1, .2, .3])); - * const table = Table.new(i32s, f32s); - * assert(table.schema.fields[0].name === 'i32'); - * assert(table.schema.fields[1].name === 'f32'); - * ``` - * - * If the supplied Vector or Column lengths are unequal, `Table.new` will - * extend the lengths of the shorter Columns, allocating additional bytes - * to represent the additional null slots. The memory required to allocate - * these additional bitmaps can be computed as: - * ```ts - * let additionalBytes = 0; - * for (let vec in shorter_vectors) { - * additionalBytes += (((longestLength - vec.length) + 63) & ~63) >> 3; - * } - * ``` - * - * For example, an additional null bitmap for one million null values would require - * 125,000 bytes (`((1e6 + 63) & ~63) >> 3`), or approx. `0.11MiB` + * @summary Get and set elements by index. */ - public static new<T extends { [key: string]: DataType } = any>(...columns: Columns<T>): Table<T>; - public static new<T extends VectorMap = any>(children: T): Table<{ [P in keyof T]: T[P] extends Vector ? T[P]['type'] : T[P] extends Exclude<TypedArray, Uint8ClampedArray> ? TypedArrayDataType<T[P]> : never}>; - public static new<T extends { [key: string]: DataType } = any>(children: ChildData<T>, fields?: Fields<T>): Table<T>; - /** @nocollapse */ - public static new(...cols: any[]) { - return new Table(...distributeColumnsIntoRecordBatches(selectColumnArgs(cols))); - } + [index: number]: Struct<T>['TValue'] | null; - constructor(batches: RecordBatch<T>[]); - constructor(...batches: RecordBatch<T>[]); - constructor(schema: Schema<T>, batches: RecordBatch<T>[]); - constructor(schema: Schema<T>, ...batches: RecordBatch<T>[]); - constructor(...args: any[]) { - - let schema: Schema<T> = null!; - - if (args[0] instanceof Schema) { schema = args.shift(); } + public readonly schema!: Schema<T>; - const chunks = selectArgs<RecordBatch<T>>(RecordBatch, args); + /** + * @summary The contiguous {@link RecordBatch `RecordBatch`} chunks of the Table rows. + */ + public readonly data!: ReadonlyArray<Data<Struct<T>>>; - if (!schema && !(schema = chunks[0]?.schema)) { - throw new TypeError('Table must be initialized with a Schema or at least one RecordBatch'); + /** + * @summary The number of null rows in this RecordBatch. + */ + public get nullCount() { + if (this._nullCount === -1) { + this._nullCount = computeChunkNullCounts(this.data); } + return this._nullCount; + } - chunks[0] || (chunks[0] = new _InternalEmptyPlaceholderRecordBatch(schema)); + /** + * @summary Check whether an element is null. + * @param index The index at which to read the validity bitmap. + */ + // @ts-ignore + public isValid(index: number): boolean { return false; } - super(new Struct(schema.fields), chunks); + /** + * @summary Get an element value by position. + * @param index The index of the element to read. + */ + // @ts-ignore + public get(index: number): T['TValue'] | null { return null; } - this._schema = schema; - this._chunks = chunks; - } + /** + * @summary Set an element value by position. + * @param index The index of the element to write. + * @param value The value to set. + */ + // @ts-ignore + public set(index: number, value: T['TValue'] | null): void { return; } - protected _schema: Schema<T>; - // List of inner RecordBatches - protected _chunks: RecordBatch<T>[]; - protected _children?: Column<T[keyof T]>[]; + /** + * @summary Retrieve the index of the first occurrence of a value in an Vector. + * @param element The value to locate in the Vector. + * @param offset The index at which to begin the search. If offset is omitted, the search starts at index 0. + */ + // @ts-ignore + public indexOf(element: T['TValue'], offset?: number): number { return -1; } - public get schema() { return this._schema; } - public get length() { return this._length; } - public get chunks() { return this._chunks; } - public get numCols() { return this._numChildren; } + /** + * @summary Get the size in bytes of an element by index. + * @param index The index at which to get the byteLength. + */ + // @ts-ignore + public getByteLength(index: number): number { return 0; } - public clone(chunks = this._chunks) { - return new Table<T>(this._schema, chunks); + /** + * @summary Iterator for rows in this Table. + */ + public [Symbol.iterator]() { + return new ChunkedIterator(this.data); } - public getColumn<R extends keyof T>(name: R): Column<T[R]> { - return this.getColumnAt(this.getColumnIndex(name)) as Column<T[R]>; - } - public getColumnAt<R extends DataType = any>(index: number): Column<R> | null { - return this.getChildAt(index); - } - public getColumnIndex<R extends keyof T>(name: R) { - return this._schema.fields.findIndex((f) => f.name === name); - } - public getChildAt<R extends DataType = any>(index: number): Column<R> | null { - if (index < 0 || index >= this.numChildren) { return null; } - let field: Field<R>, child: Column<R>; - const fields = (this._schema as Schema<any>).fields; - const columns = this._children || (this._children = []) as Column[]; - if (child = columns[index]) { return child as Column<R>; } - if (field = fields[index]) { - const chunks = this._chunks - .map((chunk) => chunk.getChildAt<R>(index)) - .filter((vec): vec is Vector<R> => vec != null); - if (chunks.length > 0) { - return (columns[index] = new Column<R>(field, chunks)); - } - } - return null; + /** + * @summary Return a JavaScript Array of the Table rows. + * @returns An Array of Table rows. + */ + public toArray() { + return this.data.reduce((ary, data) => + ary.concat(toArrayVisitor.visit(data)), + new Array<Struct<T>['TValue']>() + ); Review comment: dunno, can't run benchmarks yet :stuck_out_tongue: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org