This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-js.git


The following commit(s) were added to refs/heads/main by this push:
     new a0e6bc1  feat: Implement IPC RecordBatch body buffer compression (#14)
a0e6bc1 is described below

commit a0e6bc1023165d147c299a2cd2b38607fb7536e6
Author: Dmitrii Ignatov <[email protected]>
AuthorDate: Mon Sep 15 05:56:50 2025 +0300

    feat: Implement IPC RecordBatch body buffer compression (#14)
    
    ### Rationale for this change
    This change introduces support for reading compressed Arrow IPC streams
    in JavaScript. The primary motivation is the need to read Arrow IPC
    Stream in the browser when they are transmitted over the network in a
    compressed format to reduce network load.
    
    Several reasons support this enhancement:
    - Personal need in other project to read compressed arrow IPC stream.
    - Community demand, as seen in [Issue
    apache/arrow-js#109](https://github.com/apache/arrow-js/issues/109).
    - A similar implementation was attempted in [PR
    apache/arrow#13076](https://github.com/apache/arrow/pull/13076) but was
    never merged. I am very grateful to @kylebarron .
    - Other language implementations (e.g., C++, Python, Rust) already
    support IPC compression.
    
    ### What changes are included in this PR?
    - Support for decoding compressed RecordBatch buffers during reading.
    - Each buffer is decompressed individually, offsets are recalculated
    with 8-byte alignment, and a new metadata. RecordBatch is constructed
    before loading vectors.
    - Only decompression is implemented; compression (writing) is not
    supported yet.
    - Currently tested with the lz4 codec using the lz4js library. lz4-wasm
    was evaluated but rejected due to incompatibility with LZ4 Frame format.
    - The decompression logic is isolated to _loadRecordBatch() in the
    RecordBatchReaderImpl class.
    - A codec.decode function is retrieved from the compressionRegistry and
    applied per-buffer. So users can choose suitable lib.
    
    #### Additional notes:
    1. Codec compatibility caveats
    Not all JavaScript LZ4 libraries are compatible with the Arrow IPC
    format. For example:
    - lz4js works correctly as it supports the LZ4 Frame Format.
    - lz4-wasm is not compatible, as it expects raw LZ4 blocks and fails to
    decompress LZ4 frame data.
    This can result in silent or cryptic errors. To improve developer
    experience, we could:
    - Wrap codec.decode calls in try/catch and surface a clearer error
    message if decompression fails.
    - Add an optional check in compressionRegistry.set() to validate that
    the codec supports LZ4 Frame Format. One way would be to compress dummy
    data and inspect the first 4 bytes for the expected LZ4 Frame magic
    header (0x04 0x22 0x4D 0x18).
    2. Reconstruction of metadata.RecordBatch
    After decompressing the buffers, new BufferRegion entries are calculated
    to match the uncompressed data layout. A new metadata.RecordBatch is
    constructed with the updated buffer regions and passed into
    _loadVectors().
    This introduces a mutation-like pattern that may break assumptions in
    the current design. However, it's necessary because:
    - _loadVectors() depends strictly on the offsets in header.buffers,
    which no longer match the decompressed buffer layout.
    - Without changing either _loadVectors() or metadata.RecordBatch, the
    current approach is the least intrusive.
    3. Setting compression = null in new RecordBatch
    When reconstructing the metadata, the compression field is explicitly
    set to null, since the data is already decompressed in memory.
    This decision is somewhat debatable — feedback is welcome on whether
    it's better to retain the original compression metadata or to reflect
    the current state of the buffer (uncompressed). The current
    implementation assumes the latter.
    
    ### Are these changes tested?
    - The changes were tested in the own project using LZ4-compressed Arrow
    stream.
    - Test uncompressed, compressed and pseudo compressed(uncompressed data
    length = -1) data.
    - No unit tests are included in this PR yet.
    - The decompression was verified with real-world data and the lz4js
    codec (lz4-wasm is not compatible).
    - No issues were observed with alignment, vector loading, or
    decompression integrity.
    - Exception handling is not yet added around codec.decode. This may be
    useful for catching codec incompatibility and providing better user
    feedback.
    
    ### Are there any user-facing changes?
    Yes, Arrow JS users can now read compressed IPC stream, assuming they
    register an appropriate codec using compressionRegistry.set().
    
    Example:
    ```ts
    import { Codec, compressionRegistry } from 'apache-arrow';
    import * as lz4 from 'lz4js';
    
      const lz4Codec: Codec = {
          encode(data: Uint8Array): Uint8Array { return lz4js.compress(data) },
          decode(data: Uint8Array): Uint8Array { return lz4js.decompress(data) }
      };
    
      compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);
    ```
    This change does not affect writing or serialization.
    
    **This PR includes breaking changes to public APIs.**
    No. The change adds functionality but does not modify any existing API
    behavior.
    
    **This PR contains a "Critical Fix".**
    No. This is a new feature, not a critical fix.
    
    ### Checklist
    
    - [x] All tests pass (`yarn test`)
    - [x] Build completes (`yarn build`)
    - [x] I have added a new test for compressed batches
    
    Closes #109.
---
 package.json                                |   5 +-
 src/Arrow.dom.ts                            |   2 +
 src/Arrow.ts                                |   3 +
 src/ipc/compression/constants.ts            |  19 ++++
 src/ipc/compression/registry.ts             |  46 ++++++++++
 src/ipc/compression/validators.ts           |  92 ++++++++++++++++++++
 src/ipc/metadata/json.ts                    |   3 +-
 src/ipc/metadata/message.ts                 |  68 +++++++++++++--
 src/ipc/reader.ts                           |  94 ++++++++++++++++++--
 src/ipc/serialization.ts                    |   8 +-
 src/ipc/writer.ts                           | 129 +++++++++++++++++++++++-----
 src/visitor/vectorloader.ts                 |  13 ++-
 test/tsconfig.json                          |   1 +
 test/types/zstd-codec.d.ts                  |  22 +++++
 test/unit/ipc/writer/file-writer-tests.ts   |  57 +++++++++++-
 test/unit/ipc/writer/stream-writer-tests.ts |  47 ++++++++++
 yarn.lock                                   |  15 ++++
 17 files changed, 581 insertions(+), 43 deletions(-)

diff --git a/package.json b/package.json
index 4e86499..87119fd 100644
--- a/package.json
+++ b/package.json
@@ -66,6 +66,7 @@
     "@types/benchmark": "2.1.5",
     "@types/glob": "8.1.0",
     "@types/jest": "29.5.14",
+    "@types/lz4js": "0.2.1",
     "@types/multistream": "4.1.3",
     "async-done": "2.0.0",
     "benny": "3.7.1",
@@ -91,6 +92,7 @@
     "ix": "7.0.0",
     "jest": "29.7.0",
     "jest-silent-reporter": "0.6.0",
+    "lz4js": "0.2.0",
     "memfs": "4.38.1",
     "mkdirp": "3.0.1",
     "multistream": "4.1.0",
@@ -106,7 +108,8 @@
     "web-streams-polyfill": "3.2.1",
     "webpack": "5.101.3",
     "webpack-bundle-analyzer": "4.10.2",
-    "webpack-stream": "7.0.0"
+    "webpack-stream": "7.0.0",
+    "zstd-codec": "0.1.5"
   },
   "engines": {
     "node": ">=20.0"
diff --git a/src/Arrow.dom.ts b/src/Arrow.dom.ts
index b6e3fbc..e0cd681 100644
--- a/src/Arrow.dom.ts
+++ b/src/Arrow.dom.ts
@@ -38,6 +38,7 @@ export type {
     ReadableSource, WritableSink,
     ArrowJSONLike, FileHandle, Readable, Writable, ReadableWritable, 
ReadableDOMStreamOptions,
     IntervalDayTimeObject, IntervalMonthDayNanoObject,
+    Codec
 } from './Arrow.js';
 
 export {
@@ -76,6 +77,7 @@ export {
     RecordBatch,
     util,
     Builder, makeBuilder, builderThroughIterable, builderThroughAsyncIterable,
+    compressionRegistry, CompressionType
 } from './Arrow.js';
 
 export {
diff --git a/src/Arrow.ts b/src/Arrow.ts
index f31f91a..8321026 100644
--- a/src/Arrow.ts
+++ b/src/Arrow.ts
@@ -16,6 +16,7 @@
 // under the License.
 
 export { MessageHeader } from './fb/message-header.js';
+export { CompressionType } from './fb/compression-type.js';
 
 export {
     Type,
@@ -92,6 +93,8 @@ export type { ReadableSource, WritableSink } from 
'./io/stream.js';
 export { RecordBatchReader, RecordBatchFileReader, RecordBatchStreamReader, 
AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader } from 
'./ipc/reader.js';
 export { RecordBatchWriter, RecordBatchFileWriter, RecordBatchStreamWriter, 
RecordBatchJSONWriter } from './ipc/writer.js';
 export { tableToIPC, tableFromIPC } from './ipc/serialization.js';
+export { compressionRegistry } from './ipc/compression/registry.js';
+export type { Codec } from './ipc/compression/registry.js';
 export { MessageReader, AsyncMessageReader, JSONMessageReader } from 
'./ipc/message.js';
 export { Message } from './ipc/metadata/message.js';
 export { RecordBatch } from './recordbatch.js';
diff --git a/src/ipc/compression/constants.ts b/src/ipc/compression/constants.ts
new file mode 100644
index 0000000..bf40754
--- /dev/null
+++ b/src/ipc/compression/constants.ts
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+export const LENGTH_NO_COMPRESSED_DATA = -1;
+export const COMPRESS_LENGTH_PREFIX = 8;
diff --git a/src/ipc/compression/registry.ts b/src/ipc/compression/registry.ts
new file mode 100644
index 0000000..af7d819
--- /dev/null
+++ b/src/ipc/compression/registry.ts
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { CompressionType } from '../../fb/compression-type.js';
+import { compressionValidators } from './validators.js';
+
+export interface Codec {
+    encode?(data: Uint8Array): Uint8Array;
+    decode?(data: Uint8Array): Uint8Array;
+}
+
+class _CompressionRegistry {
+    protected declare registry: { [key in CompressionType]?: Codec };
+
+    constructor() {
+        this.registry = {};
+    }
+
+    set(compression: CompressionType, codec: Codec) {
+        if (codec?.encode && typeof codec.encode === 'function' && 
!compressionValidators[compression].isValidCodecEncode(codec)) {
+            throw new Error(`Encoder for ${CompressionType[compression]} is 
not valid.`);
+        }
+        this.registry[compression] = codec;
+    }
+
+    get(compression: CompressionType): Codec | null {
+        return this.registry?.[compression] || null;
+    }
+
+}
+
+export const compressionRegistry = new _CompressionRegistry();
diff --git a/src/ipc/compression/validators.ts 
b/src/ipc/compression/validators.ts
new file mode 100644
index 0000000..a0b0407
--- /dev/null
+++ b/src/ipc/compression/validators.ts
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import type { Codec } from './registry.ts';
+import { CompressionType } from '../../fb/compression-type.js';
+
+export interface CompressionValidator {
+    isValidCodecEncode(codec: Codec): boolean;
+}
+
+class Lz4FrameValidator implements CompressionValidator {
+    private readonly LZ4_FRAME_MAGIC = new Uint8Array([4, 34, 77, 24]);
+    private readonly MIN_HEADER_LENGTH = 7; // 4 (magic) + 2 (FLG + BD) + 1 
(header checksum) = 7 min bytes
+
+    isValidCodecEncode(codec: Codec): boolean {
+        const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
+        const compressed = codec.encode!(testData);
+        return this._isValidCompressed(compressed);
+    }
+
+    private _isValidCompressed(buffer: Uint8Array): boolean {
+        return (
+            this._hasMinimumLength(buffer) &&
+            this._hasValidMagicNumber(buffer) &&
+            this._hasValidVersion(buffer)
+        );
+    }
+
+    private _hasMinimumLength(buffer: Uint8Array): boolean {
+        return buffer.length >= this.MIN_HEADER_LENGTH;
+    }
+
+    private _hasValidMagicNumber(buffer: Uint8Array): boolean {
+        return this.LZ4_FRAME_MAGIC.every(
+            (byte, i) => buffer[i] === byte
+        );
+    }
+
+    private _hasValidVersion(buffer: Uint8Array): boolean {
+        const flg = buffer[4];
+        const versionBits = (flg & 0xC0) >> 6;
+        return versionBits === 1;
+    }
+
+}
+
+class ZstdValidator implements CompressionValidator {
+    private readonly ZSTD_MAGIC = new Uint8Array([40, 181, 47, 253]);
+    private readonly MIN_HEADER_LENGTH = 6; // 4 (magic) + 2 (min 
Frame_Header) = 6 min bytes
+
+    isValidCodecEncode(codec: Codec): boolean {
+        const testData = new Uint8Array([1, 2, 3, 4, 5, 6, 7, 8]);
+        const compressed = codec.encode!(testData);
+        return this._isValidCompressed(compressed);
+    }
+
+    private _isValidCompressed(buffer: Uint8Array): boolean {
+        return (
+            this._hasMinimumLength(buffer) &&
+            this._hasValidMagicNumber(buffer)
+        );
+    }
+
+    private _hasMinimumLength(buffer: Uint8Array): boolean {
+        return buffer.length >= this.MIN_HEADER_LENGTH;
+    }
+
+    private _hasValidMagicNumber(buffer: Uint8Array): boolean {
+        return this.ZSTD_MAGIC.every(
+            (byte, i) => buffer[i] === byte
+        );
+    }
+}
+
+export const compressionValidators: Record<CompressionType, 
CompressionValidator> = {
+    [CompressionType.LZ4_FRAME]: new Lz4FrameValidator(),
+    [CompressionType.ZSTD]: new ZstdValidator(),
+};
diff --git a/src/ipc/metadata/json.ts b/src/ipc/metadata/json.ts
index bb88f0d..15f8718 100644
--- a/src/ipc/metadata/json.ts
+++ b/src/ipc/metadata/json.ts
@@ -40,7 +40,8 @@ export function recordBatchFromJSON(b: any) {
     return new RecordBatch(
         b['count'],
         fieldNodesFromJSON(b['columns']),
-        buffersFromJSON(b['columns'])
+        buffersFromJSON(b['columns']),
+        null
     );
 }
 
diff --git a/src/ipc/metadata/message.ts b/src/ipc/metadata/message.ts
index d342897..17e8897 100644
--- a/src/ipc/metadata/message.ts
+++ b/src/ipc/metadata/message.ts
@@ -40,6 +40,9 @@ import { FixedSizeBinary as _FixedSizeBinary } from 
'../../fb/fixed-size-binary.
 import { FixedSizeList as _FixedSizeList } from '../../fb/fixed-size-list.js';
 import { Map as _Map } from '../../fb/map.js';
 import { Message as _Message } from '../../fb/message.js';
+import { CompressionType as _CompressionType } from 
'../../fb/compression-type.js';
+import { BodyCompression as _BodyCompression } from 
'../../fb/body-compression.js';
+import { BodyCompressionMethod as _BodyCompressionMethod } from 
'../../fb/body-compression-method.js';
 
 import { Schema, Field } from '../../schema.js';
 import { toUint8Array } from '../../util/buffer.js';
@@ -122,9 +125,11 @@ export class Message<T extends MessageHeader = any> {
     protected _headerType: T;
     protected _bodyLength: number;
     protected _version: MetadataVersion;
+    protected _compression: BodyCompression | null;
     public get type() { return this.headerType; }
     public get version() { return this._version; }
     public get headerType() { return this._headerType; }
+    public get compression() { return this._compression; }
     public get bodyLength() { return this._bodyLength; }
     declare protected _createHeader: MessageHeaderDecoder;
     public header() { return this._createHeader<T>(); }
@@ -136,6 +141,7 @@ export class Message<T extends MessageHeader = any> {
         this._version = version;
         this._headerType = headerType;
         this.body = new Uint8Array(0);
+        this._compression = header?.compression;
         header && (this._createHeader = () => header);
         this._bodyLength = bigIntToNumber(bodyLength);
     }
@@ -149,13 +155,21 @@ export class RecordBatch {
     protected _length: number;
     protected _nodes: FieldNode[];
     protected _buffers: BufferRegion[];
+    protected _compression: BodyCompression | null;
     public get nodes() { return this._nodes; }
     public get length() { return this._length; }
     public get buffers() { return this._buffers; }
-    constructor(length: bigint | number, nodes: FieldNode[], buffers: 
BufferRegion[]) {
+    public get compression() { return this._compression; }
+    constructor(
+        length: bigint | number,
+        nodes: FieldNode[],
+        buffers: BufferRegion[],
+        compression: BodyCompression | null
+    ) {
         this._nodes = nodes;
         this._buffers = buffers;
         this._length = bigIntToNumber(length);
+        this._compression = compression;
     }
 }
 
@@ -208,6 +222,19 @@ export class FieldNode {
     }
 }
 
+/**
+ * @ignore
+ * @private
+ **/
+export class BodyCompression {
+    public type: _CompressionType;
+    public method: _BodyCompressionMethod;
+    constructor(type: _CompressionType, method: _BodyCompressionMethod = 
_BodyCompressionMethod.BUFFER) {
+        this.type = type;
+        this.method = method;
+    }
+}
+
 /** @ignore */
 function messageHeaderFromJSON(message: any, type: MessageHeader) {
     return (() => {
@@ -254,6 +281,9 @@ FieldNode['decode'] = decodeFieldNode;
 BufferRegion['encode'] = encodeBufferRegion;
 BufferRegion['decode'] = decodeBufferRegion;
 
+BodyCompression['encode'] = encodeBodyCompression;
+BodyCompression['decode'] = decodeBodyCompression;
+
 declare module '../../schema' {
     namespace Field {
         export { encodeField as encode };
@@ -286,6 +316,10 @@ declare module './message' {
         export { encodeBufferRegion as encode };
         export { decodeBufferRegion as decode };
     }
+    namespace BodyCompression {
+        export { encodeBodyCompression as encode };
+        export { decodeBodyCompression as decode };
+    }
 }
 
 /** @ignore */
@@ -296,10 +330,13 @@ function decodeSchema(_schema: _Schema, dictionaries: 
Map<number, DataType> = ne
 
 /** @ignore */
 function decodeRecordBatch(batch: _RecordBatch, version = MetadataVersion.V5) {
-    if (batch.compression() !== null) {
-        throw new Error('Record batch compression not implemented');
-    }
-    return new RecordBatch(batch.length(), decodeFieldNodes(batch), 
decodeBuffers(batch, version));
+    const recordBatch = new RecordBatch(
+        batch.length(),
+        decodeFieldNodes(batch),
+        decodeBuffers(batch, version),
+        decodeBodyCompression(batch.compression())
+    );
+    return recordBatch;
 }
 
 /** @ignore */
@@ -491,6 +528,11 @@ function decodeFieldType(f: _Field, children?: Field[]): 
DataType<any> {
     throw new Error(`Unrecognized type: "${Type[typeId]}" (${typeId})`);
 }
 
+/** @ignore */
+function decodeBodyCompression(b: _BodyCompression | null) {
+    return b ? new BodyCompression(b.codec(), b.method()) : null;
+}
+
 /** @ignore */
 function encodeSchema(b: Builder, schema: Schema) {
 
@@ -583,13 +625,29 @@ function encodeRecordBatch(b: Builder, recordBatch: 
RecordBatch) {
 
     const buffersVectorOffset = b.endVector();
 
+    let bodyCompressionOffset = null;
+    if (recordBatch.compression !== null) {
+        bodyCompressionOffset = encodeBodyCompression(b, 
recordBatch.compression);
+    }
+
     _RecordBatch.startRecordBatch(b);
     _RecordBatch.addLength(b, BigInt(recordBatch.length));
     _RecordBatch.addNodes(b, nodesVectorOffset);
     _RecordBatch.addBuffers(b, buffersVectorOffset);
+    if (recordBatch.compression !== null && bodyCompressionOffset) {
+        _RecordBatch.addCompression(b, bodyCompressionOffset);
+    }
     return _RecordBatch.endRecordBatch(b);
 }
 
+/** @ignore */
+function encodeBodyCompression(b: Builder, node: BodyCompression) {
+    _BodyCompression.startBodyCompression(b);
+    _BodyCompression.addCodec(b, node.type);
+    _BodyCompression.addMethod(b, node.method);
+    return _BodyCompression.endBodyCompression(b);
+}
+
 /** @ignore */
 function encodeDictionaryBatch(b: Builder, dictionaryBatch: DictionaryBatch) {
     const dataOffset = RecordBatch.encode(b, dictionaryBatch.data);
diff --git a/src/ipc/reader.ts b/src/ipc/reader.ts
index f84fe83..e36eeb5 100644
--- a/src/ipc/reader.ts
+++ b/src/ipc/reader.ts
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-import { makeData } from '../data.js';
+import { Data, makeData } from '../data.js';
 import { Vector } from '../vector.js';
 import { DataType, Struct, TypeMap } from '../type.js';
 import { MessageHeader } from '../enum.js';
@@ -27,7 +27,7 @@ import * as metadata from './metadata/message.js';
 import { ArrayBufferViewInput } from '../util/buffer.js';
 import { ByteStream, AsyncByteStream } from '../io/stream.js';
 import { RandomAccessFile, AsyncRandomAccessFile } from '../io/file.js';
-import { VectorLoader, JSONVectorLoader } from '../visitor/vectorloader.js';
+import { VectorLoader, JSONVectorLoader, CompressedVectorLoader } from 
'../visitor/vectorloader.js';
 import { RecordBatch, _InternalEmptyPlaceholderRecordBatch } from 
'../recordbatch.js';
 import {
     FileHandle,
@@ -46,8 +46,12 @@ import {
     isFileHandle, isFetchResponse,
     isReadableDOMStream, isReadableNodeStream
 } from '../util/compat.js';
+import { Codec, compressionRegistry } from './compression/registry.js';
+import { bigIntToNumber } from './../util/bigint.js';
+import * as flatbuffers from 'flatbuffers';
 
 import type { DuplexOptions, Duplex } from 'node:stream';
+import { COMPRESS_LENGTH_PREFIX, LENGTH_NO_COMPRESSED_DATA } from 
'./compression/constants.js';
 
 /** @ignore */ export type FromArg0 = ArrowJSONLike;
 /** @ignore */ export type FromArg1 = PromiseLike<ArrowJSONLike>;
@@ -354,24 +358,100 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = 
any> implements RecordB
         return this;
     }
 
-    protected _loadRecordBatch(header: metadata.RecordBatch, body: any) {
-        const children = this._loadVectors(header, body, this.schema.fields);
+    protected _loadRecordBatch(header: metadata.RecordBatch, body: 
Uint8Array): RecordBatch<T> {
+        let children: Data<any>[];
+        if (header.compression != null) {
+            const codec = compressionRegistry.get(header.compression.type);
+            if (codec?.decode && typeof codec.decode === 'function') {
+                const { decommpressedBody, buffers } = 
this._decompressBuffers(header, body, codec);
+                children = this._loadCompressedVectors(header, 
decommpressedBody, this.schema.fields);
+                header = new metadata.RecordBatch(
+                    header.length,
+                    header.nodes,
+                    buffers,
+                    null
+                );
+            } else {
+                throw new Error('Record batch is compressed but codec not 
found');
+            }
+        } else {
+            children = this._loadVectors(header, body, this.schema.fields);
+        }
+
         const data = makeData({ type: new Struct(this.schema.fields), length: 
header.length, children });
         return new RecordBatch(this.schema, data);
     }
-    protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: 
any) {
+
+    protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: 
Uint8Array) {
         const { id, isDelta } = header;
         const { dictionaries, schema } = this;
         const dictionary = dictionaries.get(id);
         const type = schema.dictionaries.get(id)!;
-        const data = this._loadVectors(header.data, body, [type]);
+        let data: Data<any>[];
+        if (header.data.compression != null) {
+            const codec = 
compressionRegistry.get(header.data.compression.type);
+            if (codec?.decode && typeof codec.decode === 'function') {
+                const { decommpressedBody, buffers } = 
this._decompressBuffers(header.data, body, codec);
+                data = this._loadCompressedVectors(header.data, 
decommpressedBody, [type]);
+                header = new metadata.DictionaryBatch(new metadata.RecordBatch(
+                    header.data.length,
+                    header.data.nodes,
+                    buffers,
+                    null
+                ), id, isDelta)
+            } else {
+                throw new Error('Dictionary batch is compressed but codec not 
found');
+            }
+        } else {
+            data = this._loadVectors(header.data, body, [type]);
+        }
+        // const data = this._loadVectors(header.data, body, [type]);
         return (dictionary && isDelta ? dictionary.concat(
             new Vector(data)) :
             new Vector(data)).memoize() as Vector;
     }
-    protected _loadVectors(header: metadata.RecordBatch, body: any, types: 
(Field | DataType)[]) {
+
+    protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, 
types: (Field | DataType)[]) {
         return new VectorLoader(body, header.nodes, header.buffers, 
this.dictionaries, this.schema.metadataVersion).visitMany(types);
     }
+
+    protected _loadCompressedVectors(header: metadata.RecordBatch, body: 
Uint8Array[], types: (Field | DataType)[]) {
+        return new CompressedVectorLoader(body, header.nodes, header.buffers, 
this.dictionaries, this.schema.metadataVersion).visitMany(types);
+    }
+
+    private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, 
codec: Codec): { decommpressedBody: Uint8Array[]; buffers: 
metadata.BufferRegion[] } {
+        const decompressedBuffers: Uint8Array[] = [];
+        const newBufferRegions: metadata.BufferRegion[] = [];
+
+        let currentOffset = 0;
+        for (const { offset, length } of header.buffers) {
+            if (length === 0) {
+                decompressedBuffers.push(new Uint8Array(0));
+                newBufferRegions.push(new metadata.BufferRegion(currentOffset, 
0));
+                continue;
+            }
+            const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, 
offset + length));
+            const uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0));
+
+            const bytes = byteBuf.bytes().subarray(COMPRESS_LENGTH_PREFIX);
+
+            const decompressed = (uncompressedLenth === 
LENGTH_NO_COMPRESSED_DATA)
+                ? bytes
+                : codec.decode!(bytes);
+
+            decompressedBuffers.push(decompressed);
+
+            const padding = ((currentOffset + 7) & ~7) - currentOffset;
+            currentOffset += padding;
+            newBufferRegions.push(new metadata.BufferRegion(currentOffset, 
decompressed.length));
+            currentOffset += decompressed.length;
+        }
+
+        return {
+            decommpressedBody: decompressedBuffers,
+            buffers: newBufferRegions
+        };
+    }
 }
 
 /** @ignore */
diff --git a/src/ipc/serialization.ts b/src/ipc/serialization.ts
index aee4676..c437ffe 100644
--- a/src/ipc/serialization.ts
+++ b/src/ipc/serialization.ts
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import { CompressionType } from '../fb/compression-type.js';
 import { Table } from '../table.js';
 import { TypeMap } from '../type.js';
 import { isPromise } from '../util/compat.js';
@@ -24,7 +25,7 @@ import {
     RecordBatchFileReader, RecordBatchStreamReader,
     AsyncRecordBatchFileReader, AsyncRecordBatchStreamReader
 } from './reader.js';
-import { RecordBatchFileWriter, RecordBatchStreamWriter } from './writer.js';
+import { RecordBatchFileWriter, RecordBatchStreamWriter, 
RecordBatchStreamWriterOptions } from './writer.js';
 
 type RecordBatchReaders<T extends TypeMap = any> = RecordBatchFileReader<T> | 
RecordBatchStreamReader<T>;
 type AsyncRecordBatchReaders<T extends TypeMap = any> = 
AsyncRecordBatchFileReader<T> | AsyncRecordBatchStreamReader<T>;
@@ -58,8 +59,9 @@ export function tableFromIPC<T extends TypeMap = any>(input: 
any): Table<T> | Pr
  * @param table The Table to serialize.
  * @param type Whether to serialize the Table as a file or a stream.
  */
-export function tableToIPC<T extends TypeMap = any>(table: Table, type: 'file' 
| 'stream' = 'stream'): Uint8Array {
+export function tableToIPC<T extends TypeMap = any>(table: Table, type: 'file' 
| 'stream' = 'stream', compressionType: CompressionType | null = null): 
Uint8Array {
+    const writerOptions: RecordBatchStreamWriterOptions = { compressionType };
     return (type === 'stream' ? RecordBatchStreamWriter : 
RecordBatchFileWriter)
-        .writeAll<T>(table)
+        .writeAll<T>(table, writerOptions)
         .toUint8Array(true);
 }
diff --git a/src/ipc/writer.ts b/src/ipc/writer.ts
index cb74fe6..17c8f0b 100644
--- a/src/ipc/writer.ts
+++ b/src/ipc/writer.ts
@@ -36,6 +36,10 @@ import { Writable, ReadableInterop, ReadableDOMStreamOptions 
} from '../io/inter
 import { isPromise, isAsyncIterable, isWritableDOMStream, 
isWritableNodeStream, isIterable, isObject } from '../util/compat.js';
 
 import type { DuplexOptions, Duplex, ReadableOptions } from 'node:stream';
+import { CompressionType } from '../fb/compression-type.js';
+import { compressionRegistry } from './compression/registry.js';
+import { LENGTH_NO_COMPRESSED_DATA, COMPRESS_LENGTH_PREFIX } from 
'./compression/constants.js';
+import * as flatbuffers from 'flatbuffers';
 
 export interface RecordBatchStreamWriterOptions {
     /**
@@ -49,6 +53,10 @@ export interface RecordBatchStreamWriterOptions {
      * @see https://issues.apache.org/jira/browse/ARROW-6313
      */
     writeLegacyIpcFormat?: boolean;
+    /**
+     * Specifies the optional compression algorithm to use for record batch 
body buffers.
+     */
+    compressionType?: CompressionType | null;
 }
 
 export class RecordBatchWriter<T extends TypeMap = any> extends 
ReadableInterop<Uint8Array> implements Writable<RecordBatch<T>> {
@@ -70,15 +78,30 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
 
     constructor(options?: RecordBatchStreamWriterOptions) {
         super();
-        isObject(options) || (options = { autoDestroy: true, 
writeLegacyIpcFormat: false });
+        isObject(options) || (options = { autoDestroy: true, 
writeLegacyIpcFormat: false, compressionType: null });
         this._autoDestroy = (typeof options.autoDestroy === 'boolean') ? 
options.autoDestroy : true;
         this._writeLegacyIpcFormat = (typeof options.writeLegacyIpcFormat === 
'boolean') ? options.writeLegacyIpcFormat : false;
+        if (options.compressionType != null) {
+            if (this._writeLegacyIpcFormat) {
+                throw new Error('Legacy IPC format does not support columnar 
compression. Use modern IPC format (writeLegacyIpcFormat=false).');
+            }
+            if 
(Object.values(CompressionType).includes(options.compressionType)) {
+                this._compression = new 
metadata.BodyCompression(options.compressionType);
+            } else {
+                const validCompressionTypes = Object.values(CompressionType)
+                    .filter((v): v is string => typeof v === 'string');
+                throw new Error(`Unsupported compressionType: 
${options.compressionType} Available types: ${validCompressionTypes.join(', 
')}`);
+            }
+        } else {
+            this._compression = null;
+        }
     }
 
     protected _position = 0;
     protected _started = false;
     protected _autoDestroy: boolean;
     protected _writeLegacyIpcFormat: boolean;
+    protected _compression: metadata.BodyCompression | null = null;
     // @ts-ignore
     protected _sink = new AsyncByteQueue();
     protected _schema: Schema | null = null;
@@ -251,8 +274,8 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
     }
 
     protected _writeRecordBatch(batch: RecordBatch<T>) {
-        const { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(batch);
-        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions);
+        const { byteLength, nodes, bufferRegions, buffers } = 
this._assembleRecordBatch(batch);
+        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions, this._compression);
         const message = Message.from(recordBatch, byteLength);
         return this
             ._writeDictionaries(batch)
@@ -260,9 +283,62 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
             ._writeBodyBuffers(buffers);
     }
 
+    protected _assembleRecordBatch(batch: RecordBatch<T> | Vector) {
+        let { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(batch);
+        if (this._compression != null) {
+            ({ byteLength, bufferRegions, buffers } = 
this._compressBodyBuffers(buffers));
+        }
+        return { byteLength, nodes, bufferRegions, buffers };
+    }
+
+    protected _compressBodyBuffers(buffers: ArrayBufferView[]) {
+        const codec = compressionRegistry.get(this._compression!.type!);
+
+        if (!codec?.encode || typeof codec.encode !== 'function') {
+            throw new Error(`Codec for compression type 
"${CompressionType[this._compression!.type!]}" has invalid encode method`);
+        }
+
+        let currentOffset = 0;
+        const compressedBuffers: ArrayBufferView[] = [];
+        const bufferRegions: metadata.BufferRegion[] = [];
+
+        for (const buffer of buffers) {
+            const byteBuf = toUint8Array(buffer);
+
+            if (byteBuf.length === 0) {
+                compressedBuffers.push(new Uint8Array(0), new Uint8Array(0));
+                bufferRegions.push(new metadata.BufferRegion(currentOffset, 
0));
+                continue;
+            }
+
+            const compressed = codec.encode(byteBuf);
+            const isCompressionEffective = compressed.length < byteBuf.length;
+
+            const finalBuffer = isCompressionEffective ? compressed : byteBuf;
+            const byteLength = isCompressionEffective ? finalBuffer.length : 
LENGTH_NO_COMPRESSED_DATA;
+
+            const lengthPrefix = new flatbuffers.ByteBuffer(new 
Uint8Array(COMPRESS_LENGTH_PREFIX));
+            lengthPrefix.writeInt64(0, BigInt(byteLength));
+
+            compressedBuffers.push(lengthPrefix.bytes(), new 
Uint8Array(finalBuffer));
+
+            const padding = ((currentOffset + 7) & ~7) - currentOffset;
+            currentOffset += padding;
+
+            const fullBodyLength = COMPRESS_LENGTH_PREFIX + finalBuffer.length;
+            bufferRegions.push(new metadata.BufferRegion(currentOffset, 
fullBodyLength));
+
+            currentOffset += fullBodyLength;
+        }
+        const finalPadding = ((currentOffset + 7) & ~7) - currentOffset;
+        currentOffset += finalPadding;
+
+        return { byteLength: currentOffset, bufferRegions, buffers: 
compressedBuffers };
+    }
+
     protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = 
false) {
-        const { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(new Vector([dictionary]));
-        const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, 
bufferRegions);
+        const { byteLength, nodes, bufferRegions, buffers } = 
this._assembleRecordBatch(new Vector([dictionary]));
+        const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, 
bufferRegions, this._compression);
         const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, 
isDelta);
         const message = Message.from(dictionaryBatch, byteLength);
         return this
@@ -271,14 +347,24 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
     }
 
     protected _writeBodyBuffers(buffers: ArrayBufferView[]) {
-        let buffer: ArrayBufferView;
-        let size: number, padding: number;
-        for (let i = -1, n = buffers.length; ++i < n;) {
-            if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) {
-                this._write(buffer);
-                if ((padding = ((size + 7) & ~7) - size) > 0) {
-                    this._writePadding(padding);
-                }
+        const bufGroupSize = this._compression != null ? 2 : 1;
+        const bufs = new Array(bufGroupSize);
+
+        for (let i = 0; i < buffers.length; i += bufGroupSize) {
+            let size = 0;
+            for (let j = -1; ++j < bufGroupSize;) {
+                bufs[j] = buffers[i + j];
+                size += bufs[j].byteLength;
+            }
+
+            if (size === 0) {
+                continue;
+            }
+
+            for (const buf of bufs) this._write(buf);
+            const padding = ((size + 7) & ~7) - size;
+            if (padding > 0) {
+                this._writePadding(padding);
             }
         }
         return this;
@@ -325,13 +411,13 @@ export class RecordBatchStreamWriter<T extends TypeMap = 
any> extends RecordBatc
 
 /** @ignore */
 export class RecordBatchFileWriter<T extends TypeMap = any> extends 
RecordBatchWriter<T> {
-    public static writeAll<T extends TypeMap = any>(input: Table<T> | 
Iterable<RecordBatch<T>>): RecordBatchFileWriter<T>;
-    public static writeAll<T extends TypeMap = any>(input: 
AsyncIterable<RecordBatch<T>>): Promise<RecordBatchFileWriter<T>>;
-    public static writeAll<T extends TypeMap = any>(input: 
PromiseLike<AsyncIterable<RecordBatch<T>>>): Promise<RecordBatchFileWriter<T>>;
-    public static writeAll<T extends TypeMap = any>(input: 
PromiseLike<Table<T> | Iterable<RecordBatch<T>>>): 
Promise<RecordBatchFileWriter<T>>;
+    public static writeAll<T extends TypeMap = any>(input: Table<T> | 
Iterable<RecordBatch<T>>, options?: RecordBatchStreamWriterOptions): 
RecordBatchFileWriter<T>;
+    public static writeAll<T extends TypeMap = any>(input: 
AsyncIterable<RecordBatch<T>>, options?: RecordBatchStreamWriterOptions): 
Promise<RecordBatchFileWriter<T>>;
+    public static writeAll<T extends TypeMap = any>(input: 
PromiseLike<AsyncIterable<RecordBatch<T>>>, options?: 
RecordBatchStreamWriterOptions): Promise<RecordBatchFileWriter<T>>;
+    public static writeAll<T extends TypeMap = any>(input: 
PromiseLike<Table<T> | Iterable<RecordBatch<T>>>, options?: 
RecordBatchStreamWriterOptions): Promise<RecordBatchFileWriter<T>>;
     /** @nocollapse */
-    public static writeAll<T extends TypeMap = any>(input: any) {
-        const writer = new RecordBatchFileWriter<T>();
+    public static writeAll<T extends TypeMap = any>(input: any, options?: 
RecordBatchStreamWriterOptions) {
+        const writer = new RecordBatchFileWriter<T>(options);
         if (isPromise<any>(input)) {
             return input.then((x) => writer.writeAll(x));
         } else if (isAsyncIterable<RecordBatch<T>>(input)) {
@@ -340,9 +426,10 @@ export class RecordBatchFileWriter<T extends TypeMap = 
any> extends RecordBatchW
         return writeAll(writer, input);
     }
 
-    constructor() {
-        super();
+    constructor(options?: RecordBatchStreamWriterOptions) {
+        super(options);
         this._autoDestroy = true;
+        this._writeLegacyIpcFormat = false;
     }
 
     // @ts-ignore
diff --git a/src/visitor/vectorloader.ts b/src/visitor/vectorloader.ts
index 198c32f..7c82e7a 100644
--- a/src/visitor/vectorloader.ts
+++ b/src/visitor/vectorloader.ts
@@ -41,7 +41,7 @@ export class VectorLoader extends Visitor {
     private nodes: FieldNode[];
     private nodesIndex = -1;
     private buffers: BufferRegion[];
-    private buffersIndex = -1;
+    protected buffersIndex = -1;
     private dictionaries: Map<number, Vector<any>>;
     private readonly metadataVersion: MetadataVersion;
     constructor(bytes: Uint8Array, nodes: FieldNode[], buffers: 
BufferRegion[], dictionaries: Map<number, Vector<any>>, metadataVersion = 
MetadataVersion.V5) {
@@ -205,3 +205,14 @@ function binaryDataFromJSON(values: string[]) {
     }
     return data;
 }
+
+export class CompressedVectorLoader extends VectorLoader {
+    private bodyChunks: Uint8Array[];
+    constructor(bodyChunks: Uint8Array[], nodes: FieldNode[], buffers: 
BufferRegion[], dictionaries: Map<number, Vector<any>>, metadataVersion: 
MetadataVersion) {
+        super(new Uint8Array(0), nodes, buffers, dictionaries, 
metadataVersion);
+        this.bodyChunks = bodyChunks;
+    }
+    protected readData<T extends DataType>(_type: T, _buffer = 
this.nextBufferRange()) {
+        return this.bodyChunks[this.buffersIndex];
+    }
+}
diff --git a/test/tsconfig.json b/test/tsconfig.json
index bd43e09..e1ad138 100644
--- a/test/tsconfig.json
+++ b/test/tsconfig.json
@@ -17,6 +17,7 @@
     "inlineSourceMap": false,
     "downlevelIteration": false,
     "baseUrl": "../",
+    "typeRoots": ["../node_modules/@types", "./types"],
     "paths": {
       "apache-arrow": ["src/Arrow.node"],
       "apache-arrow/*": ["src/*"]
diff --git a/test/types/zstd-codec.d.ts b/test/types/zstd-codec.d.ts
new file mode 100644
index 0000000..76176d3
--- /dev/null
+++ b/test/types/zstd-codec.d.ts
@@ -0,0 +1,22 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+declare module 'zstd-codec' {
+    export const ZstdCodec: {
+        run(callback: (zstd: any) => void): void;
+    };
+}
diff --git a/test/unit/ipc/writer/file-writer-tests.ts 
b/test/unit/ipc/writer/file-writer-tests.ts
index 2b99d0f..f6632d8 100644
--- a/test/unit/ipc/writer/file-writer-tests.ts
+++ b/test/unit/ipc/writer/file-writer-tests.ts
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+import { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer.js';
 import {
     generateDictionaryTables, generateRandomTables
 } from '../../../data/tables.js';
@@ -23,6 +24,9 @@ import { validateRecordBatchIterator } from '../validate.js';
 
 import {
     builderThroughIterable,
+    Codec,
+    compressionRegistry,
+    CompressionType,
     Dictionary,
     Int32,
     RecordBatch,
@@ -32,6 +36,40 @@ import {
     Uint32,
     Vector
 } from 'apache-arrow';
+import * as lz4js from 'lz4js';
+
+export async function registerCompressionCodecs(): Promise<void> {
+    if (compressionRegistry.get(CompressionType.LZ4_FRAME) === null) {
+        const lz4Codec: Codec = {
+            encode(data: Uint8Array): Uint8Array {
+                return lz4js.compress(data);
+            },
+            decode(data: Uint8Array): Uint8Array {
+                return lz4js.decompress(data);
+            }
+        };
+        compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);
+    }
+
+    if (compressionRegistry.get(CompressionType.ZSTD) === null) {
+        const { ZstdCodec } = await import('zstd-codec');
+        await new Promise<void>((resolve) => {
+            ZstdCodec.run((zstd: any) => {
+                const simple = new zstd.Simple();
+                const zstdCodec: Codec = {
+                    encode(data: Uint8Array): Uint8Array {
+                        return simple.compress(data);
+                    },
+                    decode(data: Uint8Array): Uint8Array {
+                        return simple.decompress(data);
+                    }
+                };
+                compressionRegistry.set(CompressionType.ZSTD, zstdCodec);
+                resolve();
+            });
+        });
+    }
+}
 
 describe('RecordBatchFileWriter', () => {
     for (const table of generateRandomTables([10, 20, 30])) {
@@ -41,6 +79,17 @@ describe('RecordBatchFileWriter', () => {
         testFileWriter(table, `${table.schema.fields[0]}`);
     }
 
+    const compressionTypes = [CompressionType.LZ4_FRAME, CompressionType.ZSTD];
+    beforeAll(async () => {
+        await registerCompressionCodecs();
+    });
+
+    const table = generate.table([10, 20, 30]).table;
+    for (const compressionType of compressionTypes) {
+        const testName = `[${table.schema.fields.join(', ')}] - 
${CompressionType[compressionType]} compressed`;
+        testFileWriter(table, testName, { compressionType });
+    }
+
     it('should throw if attempting to write replacement dictionary batches', 
async () => {
         const type = new Dictionary<Uint32, Int32>(new Uint32, new Int32, 0);
         const writer = new RecordBatchFileWriter();
@@ -91,14 +140,14 @@ describe('RecordBatchFileWriter', () => {
     });
 });
 
-function testFileWriter(table: Table, name: string) {
+function testFileWriter(table: Table, name: string, options?: 
RecordBatchStreamWriterOptions) {
     describe(`should write the Arrow IPC file format (${name})`, () => {
-        test(`Table`, validateTable.bind(0, table));
+        test(`Table`, validateTable.bind(0, table, options));
     });
 }
 
-async function validateTable(source: Table) {
-    const writer = RecordBatchFileWriter.writeAll(source);
+async function validateTable(source: Table, options?: 
RecordBatchStreamWriterOptions) {
+    const writer = RecordBatchFileWriter.writeAll(source, options);
     const result = new Table(RecordBatchReader.from(await 
writer.toUint8Array()));
     validateRecordBatchIterator(3, source.batches);
     expect(result).toEqualTable(source);
diff --git a/test/unit/ipc/writer/stream-writer-tests.ts 
b/test/unit/ipc/writer/stream-writer-tests.ts
index 11bbe73..2c2e3d3 100644
--- a/test/unit/ipc/writer/stream-writer-tests.ts
+++ b/test/unit/ipc/writer/stream-writer-tests.ts
@@ -25,6 +25,9 @@ import { validateRecordBatchIterator } from '../validate.js';
 import type { RecordBatchStreamWriterOptions } from 'apache-arrow/ipc/writer';
 import {
     builderThroughIterable,
+    Codec,
+    compressionRegistry,
+    CompressionType,
     Data,
     Dictionary,
     Field,
@@ -37,6 +40,40 @@ import {
     Uint32,
     Vector
 } from 'apache-arrow';
+import * as lz4js from 'lz4js';
+
+export async function registerCompressionCodecs(): Promise<void> {
+    if (compressionRegistry.get(CompressionType.LZ4_FRAME) === null) {
+        const lz4Codec: Codec = {
+            encode(data: Uint8Array): Uint8Array {
+                return lz4js.compress(data);
+            },
+            decode(data: Uint8Array): Uint8Array {
+                return lz4js.decompress(data);
+            }
+        };
+        compressionRegistry.set(CompressionType.LZ4_FRAME, lz4Codec);
+    }
+
+    if (compressionRegistry.get(CompressionType.ZSTD) === null) {
+        const { ZstdCodec } = await import('zstd-codec');
+        await new Promise<void>((resolve) => {
+            ZstdCodec.run((zstd: any) => {
+                const simple = new zstd.Simple();
+                const zstdCodec: Codec = {
+                    encode(data: Uint8Array): Uint8Array {
+                        return simple.compress(data);
+                    },
+                    decode(data: Uint8Array): Uint8Array {
+                        return simple.decompress(data);
+                    }
+                };
+                compressionRegistry.set(CompressionType.ZSTD, zstdCodec);
+                resolve();
+            });
+        });
+    }
+}
 
 describe('RecordBatchStreamWriter', () => {
 
@@ -47,6 +84,16 @@ describe('RecordBatchStreamWriter', () => {
     testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
     testStreamWriter(table, testName, { writeLegacyIpcFormat: false });
 
+    const compressionTypes = [CompressionType.LZ4_FRAME, CompressionType.ZSTD];
+    beforeAll(async () => {
+        await registerCompressionCodecs();
+    });
+
+    for (const compressionType of compressionTypes) {
+        const testName = `[${table.schema.fields.join(', ')}] - 
${CompressionType[compressionType]} compressed`;
+        testStreamWriter(table, testName, { compressionType });
+    }
+
     for (const table of generateRandomTables([10, 20, 30])) {
         const testName = `[${table.schema.fields.join(', ')}]`;
         testStreamWriter(table, testName, { writeLegacyIpcFormat: true });
diff --git a/yarn.lock b/yarn.lock
index 5e09981..865e7c8 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -1556,6 +1556,11 @@
   resolved 
"https://registry.yarnpkg.com/@types/json-schema/-/json-schema-7.0.15.tgz#596a1747233694d50f6ad8a7869fcb6f56cf5841";
   integrity 
sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==
 
+"@types/[email protected]":
+  version "0.2.1"
+  resolved 
"https://registry.yarnpkg.com/@types/lz4js/-/lz4js-0.2.1.tgz#44214fe6b28187ff36eee03afc2b344cbd886b3e";
+  integrity 
sha512-aAnbA4uKPNqZqu0XK1QAwKP0Wskb4Oa7ZFqxW5CMIyGgqYQKFgBxTfK3m3KODXoOLv5t14VregzgrEak13uGQA==
+
 "@types/minimatch@^5.1.2":
   version "5.1.2"
   resolved 
"https://registry.yarnpkg.com/@types/minimatch/-/minimatch-5.1.2.tgz#07508b45797cb81ec3f273011b054cd0755eddca";
@@ -4746,6 +4751,11 @@ lunr@^2.3.9:
   resolved 
"https://registry.yarnpkg.com/lunr/-/lunr-2.3.9.tgz#18b123142832337dd6e964df1a5a7707b25d35e1";
   integrity 
sha512-zTU3DaZaF3Rt9rhN3uBMGQD3dD2/vFQqnvZCDv4dl5iOzq2IZQqTxu90r4E5J+nP70J3ilqVCrbho2eWaeW8Ow==
 
[email protected]:
+  version "0.2.0"
+  resolved 
"https://registry.yarnpkg.com/lz4js/-/lz4js-0.2.0.tgz#09f1a397cb2158f675146c3351dde85058cb322f";
+  integrity 
sha512-gY2Ia9Lm7Ep8qMiuGRhvUq0Q7qUereeldZPP1PMEJxPtEWHJLqw9pgX68oHajBH0nzJK4MaZEA/YNV3jT8u8Bg==
+
 make-dir@^4.0.0:
   version "4.0.0"
   resolved 
"https://registry.yarnpkg.com/make-dir/-/make-dir-4.0.0.tgz#c3c2307a771277cd9638305f915c29ae741b614e";
@@ -6615,3 +6625,8 @@ yocto-queue@^0.1.0:
   version "0.1.0"
   resolved 
"https://registry.yarnpkg.com/yocto-queue/-/yocto-queue-0.1.0.tgz#0294eb3dee05028d31ee1a5fa2c556a6aaf10a1b";
   integrity 
sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==
+
[email protected]:
+  version "0.1.5"
+  resolved 
"https://registry.yarnpkg.com/zstd-codec/-/zstd-codec-0.1.5.tgz#c180193e4603ef74ddf704bcc835397d30a60e42";
+  integrity 
sha512-v3fyjpK8S/dpY/X5WxqTK3IoCnp/ZOLxn144GZVlNUjtwAchzrVo03h+oMATFhCIiJ5KTr4V3vDQQYz4RU684g==

Reply via email to