Djjanks commented on code in PR #14:
URL: https://github.com/apache/arrow-js/pull/14#discussion_r2134714400
##########
test/unit/ipc/writer/file-writer-tests.ts:
##########
@@ -32,6 +34,41 @@ import {
Uint32,
Vector
} from 'apache-arrow';
+import { Codec, compressionRegistry } from
'apache-arrow/ipc/compression/registry';
+import * as lz4js from 'lz4js';
+
+export async function registerCompressionCodecs(): Promise<void> {
Review Comment:
I've implemented ZSTD compression with async initialization since most
popular libraries require WASM/Node.js. I used dynamic import for ZSTD to avoid
bundling issues. Duplicated the registration logic from stream writer because
separating it into a shared module caused Jest import errors. May by somebody
knows more elegant way to import zstd?
##########
src/ipc/writer.ts:
##########
@@ -251,34 +274,99 @@ export class RecordBatchWriter<T extends TypeMap = any>
extends ReadableInterop<
}
protected _writeRecordBatch(batch: RecordBatch<T>) {
- const { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(batch);
- const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions);
+ const { byteLength, nodes, bufferRegions, buffers } =
this._assembleRecordBatch(batch);
+ const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions, this._compression);
const message = Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
._writeMessage(message)
._writeBodyBuffers(buffers);
}
+ protected _assembleRecordBatch(batch: RecordBatch<T>) {
+ let { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(batch);
+ if (this._compression != null) {
+ ({ byteLength, bufferRegions, buffers } =
this._compressBodyBuffers(buffers));
+ }
+ return { byteLength, nodes, bufferRegions, buffers };
+ }
+
+ protected _compressBodyBuffers(buffers: ArrayBufferView[]) {
+ const codec = compressionRegistry.get(this._compression!.type!);
+
+ if (!codec?.encode || typeof codec.encode !== 'function') {
+ throw new Error(`Codec for compression type
"${CompressionType[this._compression!.type!]}" has invalid encode method`);
+ }
+
+ let currentOffset = 0;
+ const compressedBuffers: ArrayBufferView[] = [];
+ const bufferRegions: metadata.BufferRegion[] = [];
+
+ for (const buffer of buffers) {
+ const byteBuf = toUint8Array(buffer);
+
+ if (byteBuf.length === 0) {
+ compressedBuffers.push(new Uint8Array(0), new Uint8Array(0));
+ bufferRegions.push(new metadata.BufferRegion(currentOffset,
0));
+ continue;
+ }
+
+ const compressed = codec.encode(byteBuf);
+ const isCompressionEffective = compressed.length < byteBuf.length;
+
+ const finalBuffer = isCompressionEffective ? compressed : byteBuf;
+ const byteLength = isCompressionEffective ? finalBuffer.length :
LENGTH_NO_COMPRESSED_DATA;
+
+ const lengthPrefix = new flatbuffers.ByteBuffer(new
Uint8Array(COMPRESS_LENGTH_PREFIX));
+ lengthPrefix.writeInt64(0, BigInt(byteLength));
+
+ compressedBuffers.push(lengthPrefix.bytes(), new
Uint8Array(finalBuffer));
+
+ const padding = ((currentOffset + 7) & ~7) - currentOffset;
+ currentOffset += padding;
+
+ const fullBodyLength = COMPRESS_LENGTH_PREFIX + finalBuffer.length;
+ bufferRegions.push(new metadata.BufferRegion(currentOffset,
fullBodyLength));
+
+ currentOffset += fullBodyLength;
+ }
+ const finalPadding = ((currentOffset + 7) & ~7) - currentOffset;
+ currentOffset += finalPadding;
+
+ return { byteLength: currentOffset, bufferRegions, buffers:
compressedBuffers };
+ }
+
protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta =
false) {
const { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(new Vector([dictionary]));
- const recordBatch = new metadata.RecordBatch(dictionary.length, nodes,
bufferRegions);
+ const recordBatch = new metadata.RecordBatch(dictionary.length, nodes,
bufferRegions, null);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id,
isDelta);
const message = Message.from(dictionaryBatch, byteLength);
return this
._writeMessage(message)
- ._writeBodyBuffers(buffers);
+ ._writeBodyBuffers(buffers, "dictionary");
}
- protected _writeBodyBuffers(buffers: ArrayBufferView[]) {
- let buffer: ArrayBufferView;
- let size: number, padding: number;
- for (let i = -1, n = buffers.length; ++i < n;) {
- if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) {
- this._write(buffer);
- if ((padding = ((size + 7) & ~7) - size) > 0) {
- this._writePadding(padding);
- }
+ protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType:
"record" | "dictionary" = "record") {
Review Comment:
According to [Arrow format
documentation](https://arrow.apache.org/docs/format/Columnar.html#compression),
only record batches are compressed, not dictionary batches. Is this correct? I
add attribute batchType to aviod bufGroupSize compression logic. Have I
understood this correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]