Djjanks commented on code in PR #14: URL: https://github.com/apache/arrow-js/pull/14#discussion_r2134714400
########## test/unit/ipc/writer/file-writer-tests.ts: ########## @@ -32,6 +34,41 @@ import { Uint32, Vector } from 'apache-arrow'; +import { Codec, compressionRegistry } from 'apache-arrow/ipc/compression/registry'; +import * as lz4js from 'lz4js'; + +export async function registerCompressionCodecs(): Promise<void> { Review Comment: I've implemented ZSTD compression with async initialization since most popular libraries require WASM/Node.js. I used dynamic import for ZSTD to avoid bundling issues. Duplicated the registration logic from stream writer because separating it into a shared module caused Jest import errors. May by somebody knows more elegant way to import zstd? ########## src/ipc/writer.ts: ########## @@ -251,34 +274,99 @@ export class RecordBatchWriter<T extends TypeMap = any> extends ReadableInterop< } protected _writeRecordBatch(batch: RecordBatch<T>) { - const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch); - const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions); + const { byteLength, nodes, bufferRegions, buffers } = this._assembleRecordBatch(batch); + const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, bufferRegions, this._compression); const message = Message.from(recordBatch, byteLength); return this ._writeDictionaries(batch) ._writeMessage(message) ._writeBodyBuffers(buffers); } + protected _assembleRecordBatch(batch: RecordBatch<T>) { + let { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(batch); + if (this._compression != null) { + ({ byteLength, bufferRegions, buffers } = this._compressBodyBuffers(buffers)); + } + return { byteLength, nodes, bufferRegions, buffers }; + } + + protected _compressBodyBuffers(buffers: ArrayBufferView[]) { + const codec = compressionRegistry.get(this._compression!.type!); + + if (!codec?.encode || typeof codec.encode !== 'function') { + throw new Error(`Codec for compression type "${CompressionType[this._compression!.type!]}" has invalid encode method`); + } + + let currentOffset = 0; + const compressedBuffers: ArrayBufferView[] = []; + const bufferRegions: metadata.BufferRegion[] = []; + + for (const buffer of buffers) { + const byteBuf = toUint8Array(buffer); + + if (byteBuf.length === 0) { + compressedBuffers.push(new Uint8Array(0), new Uint8Array(0)); + bufferRegions.push(new metadata.BufferRegion(currentOffset, 0)); + continue; + } + + const compressed = codec.encode(byteBuf); + const isCompressionEffective = compressed.length < byteBuf.length; + + const finalBuffer = isCompressionEffective ? compressed : byteBuf; + const byteLength = isCompressionEffective ? finalBuffer.length : LENGTH_NO_COMPRESSED_DATA; + + const lengthPrefix = new flatbuffers.ByteBuffer(new Uint8Array(COMPRESS_LENGTH_PREFIX)); + lengthPrefix.writeInt64(0, BigInt(byteLength)); + + compressedBuffers.push(lengthPrefix.bytes(), new Uint8Array(finalBuffer)); + + const padding = ((currentOffset + 7) & ~7) - currentOffset; + currentOffset += padding; + + const fullBodyLength = COMPRESS_LENGTH_PREFIX + finalBuffer.length; + bufferRegions.push(new metadata.BufferRegion(currentOffset, fullBodyLength)); + + currentOffset += fullBodyLength; + } + const finalPadding = ((currentOffset + 7) & ~7) - currentOffset; + currentOffset += finalPadding; + + return { byteLength: currentOffset, bufferRegions, buffers: compressedBuffers }; + } + protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = false) { const { byteLength, nodes, bufferRegions, buffers } = VectorAssembler.assemble(new Vector([dictionary])); - const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions); + const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, bufferRegions, null); const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, isDelta); const message = Message.from(dictionaryBatch, byteLength); return this ._writeMessage(message) - ._writeBodyBuffers(buffers); + ._writeBodyBuffers(buffers, "dictionary"); } - protected _writeBodyBuffers(buffers: ArrayBufferView[]) { - let buffer: ArrayBufferView; - let size: number, padding: number; - for (let i = -1, n = buffers.length; ++i < n;) { - if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) { - this._write(buffer); - if ((padding = ((size + 7) & ~7) - size) > 0) { - this._writePadding(padding); - } + protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType: "record" | "dictionary" = "record") { Review Comment: According to [Arrow format documentation](https://arrow.apache.org/docs/format/Columnar.html#compression), only record batches are compressed, not dictionary batches. Is this correct? I add attribute batchType to aviod bufGroupSize compression logic. Have I understood this correctly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org