Djjanks commented on code in PR #14:
URL: https://github.com/apache/arrow-js/pull/14#discussion_r2110437456


##########
src/ipc/reader.ts:
##########
@@ -369,9 +389,51 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = 
any> implements RecordB
             new Vector(data)) :
             new Vector(data)).memoize() as Vector;
     }
-    protected _loadVectors(header: metadata.RecordBatch, body: any, types: 
(Field | DataType)[]) {
+    protected _loadVectors(header: metadata.RecordBatch, body: Uint8Array, 
types: (Field | DataType)[]) {
         return new VectorLoader(body, header.nodes, header.buffers, 
this.dictionaries, this.schema.metadataVersion).visitMany(types);
     }
+
+    private _decompressBuffers(header: metadata.RecordBatch, body: Uint8Array, 
codec: Codec): { decommpressedBody: Uint8Array; buffers: 
metadata.BufferRegion[] } {
+        const decompressedBuffers: Uint8Array[] = [];
+        const newBufferRegions: metadata.BufferRegion[] = [];
+
+        let currentOffset = 0;
+        for (const { offset, length } of header.buffers) {
+            if (length === 0) {
+                decompressedBuffers.push(new Uint8Array(0));
+                newBufferRegions.push(new metadata.BufferRegion(currentOffset, 
0));
+                continue;
+            }
+            const byteBuf = new flatbuffers.ByteBuffer(body.subarray(offset, 
offset + length));
+            const uncompressedLenth = bigIntToNumber(byteBuf.readInt64(0));
+
+
+            const bytes = byteBuf.bytes().subarray(LENGTH_OF_PREFIX_DATA);
+
+            const decompressed = (uncompressedLenth === 
LENGTH_NO_COMPRESSED_DATA)
+                ? bytes
+                : codec.decode!(bytes);
+
+            decompressedBuffers.push(decompressed);
+
+            const padding = (DEFAULT_ALIGNMENT - (currentOffset % 
DEFAULT_ALIGNMENT)) % DEFAULT_ALIGNMENT;
+            currentOffset += padding;
+            newBufferRegions.push(new metadata.BufferRegion(currentOffset, 
decompressed.length));
+            currentOffset += decompressed.length;
+        }
+
+        const totalSize = currentOffset;
+        const combined = new Uint8Array(totalSize);
+
+        for (const [i, decompressedBuffer] of decompressedBuffers.entries()) {
+            combined.set(decompressedBuffer, newBufferRegions[i].offset);

Review Comment:
   Hi, @trxcllnt!
   
   I've implemented compression support for the reader and performed some minor 
refactoring to improve the structure. Here are the key changes:
   
   - Added compression support for the writer (debugged and tested)
   - Successfully verified LZ4 writer locally - it works correctly
   - Small refactoring to streamline the code
   - Introduced codec validators to prevent potential library mismatch issues
   
   The main motivation for validators came from realizing that the current 
`CompressionRegistry` approach might cause problems for users when trying to 
match compression/decompression libraries across different environments.
   
   Could you please review my changes, especially the validation logic? Maybe 
you can suggest something about ZSTD validation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to