This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new b0d8d82de6 GH-43015: [Go] Reuse column compressors in IPC module 
(#43016)
b0d8d82de6 is described below

commit b0d8d82de65c55a2ec975ed39f01e3f0d5f9e7ec
Author: Wyatt Alt <[email protected]>
AuthorDate: Tue Jun 25 12:27:35 2024 -0700

    GH-43015: [Go] Reuse column compressors in IPC module (#43016)
    
    This changes the IPC writer to reuse lz4 or zstd compressors instead of
    instantiating new ones on each call to Write. Prior to this commit we
    would instantiate as many encoders per Write call as the configured
    number of concurrent compression workers, even though the compressors
    are reusable across calls.
    
    ### Are these changes tested?
    I have added no tests and have not yet closely verified the patch.
    There's no functional change to test and I want to get agreement on the
    approach before solidifying it. I have not tested the lz4 change at all.
    * GitHub Issue: #43015
---
 go/arrow/ipc/file_writer.go |  6 +++++-
 go/arrow/ipc/ipc.go         |  6 +++++-
 go/arrow/ipc/writer.go      | 45 ++++++++++++++++++++++++++++++++++++++-------
 go/arrow/ipc/writer_test.go |  5 +++--
 4 files changed, 51 insertions(+), 11 deletions(-)

diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go
index cf795fa57d..8cea458192 100644
--- a/go/arrow/ipc/file_writer.go
+++ b/go/arrow/ipc/file_writer.go
@@ -278,6 +278,7 @@ type FileWriter struct {
        mapper          dictutils.Mapper
        codec           flatbuf.CompressionType
        compressNP      int
+       compressors     []compressor
        minSpaceSavings *float64
 
        // map of the last written dictionaries by id
@@ -302,6 +303,7 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) 
(*FileWriter, error) {
                codec:           cfg.codec,
                compressNP:      cfg.compressNP,
                minSpaceSavings: cfg.minSpaceSavings,
+               compressors:     make([]compressor, cfg.compressNP),
        }
 
        pos, err := f.w.Seek(0, io.SeekCurrent)
@@ -345,7 +347,9 @@ func (f *FileWriter) Write(rec arrow.Record) error {
        const allow64b = true
        var (
                data = Payload{msg: MessageRecordBatch}
-               enc  = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, 
f.codec, f.compressNP, f.minSpaceSavings)
+               enc  = newRecordEncoder(
+                       f.mem, 0, kMaxNestingDepth, allow64b, f.codec, 
f.compressNP, f.minSpaceSavings, f.compressors,
+               )
        )
        defer data.Release()
 
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 2398c8f7e7..b31a358a8a 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -79,6 +79,7 @@ func newConfig(opts ...Option) *config {
                alloc:              memory.NewGoAllocator(),
                codec:              -1, // uncompressed
                ensureNativeEndian: true,
+               compressNP:         1,
        }
 
        for _, opt := range opts {
@@ -132,9 +133,12 @@ func WithZstd() Option {
 // WithCompressConcurrency specifies a number of goroutines to spin up for
 // concurrent compression of the body buffers when writing compress IPC 
records.
 // If n <= 1 then compression will be done serially without goroutine
-// parallelization. Default is 0.
+// parallelization. Default is 1.
 func WithCompressConcurrency(n int) Option {
        return func(cfg *config) {
+               if n <= 0 {
+                       n = 1
+               }
                cfg.compressNP = n
        }
 }
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index dfca2b9103..f2afd2db42 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -86,6 +86,7 @@ type Writer struct {
        mapper          dictutils.Mapper
        codec           flatbuf.CompressionType
        compressNP      int
+       compressors     []compressor
        minSpaceSavings *float64
 
        // map of the last written dictionaries by id
@@ -107,6 +108,7 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts 
...Option) *Writer {
                compressNP:      cfg.compressNP,
                minSpaceSavings: cfg.minSpaceSavings,
                emitDictDeltas:  cfg.emitDictDeltas,
+               compressors:     make([]compressor, cfg.compressNP),
        }
 }
 
@@ -120,6 +122,8 @@ func NewWriter(w io.Writer, opts ...Option) *Writer {
                schema:         cfg.schema,
                codec:          cfg.codec,
                emitDictDeltas: cfg.emitDictDeltas,
+               compressNP:     cfg.compressNP,
+               compressors:    make([]compressor, cfg.compressNP),
        }
 }
 
@@ -170,7 +174,16 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
        const allow64b = true
        var (
                data = Payload{msg: MessageRecordBatch}
-               enc  = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, 
w.codec, w.compressNP, w.minSpaceSavings)
+               enc  = newRecordEncoder(
+                       w.mem,
+                       0,
+                       kMaxNestingDepth,
+                       allow64b,
+                       w.codec,
+                       w.compressNP,
+                       w.minSpaceSavings,
+                       w.compressors,
+               )
        )
        defer data.Release()
 
@@ -310,10 +323,20 @@ type recordEncoder struct {
        allow64b        bool
        codec           flatbuf.CompressionType
        compressNP      int
+       compressors     []compressor
        minSpaceSavings *float64
 }
 
-func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, 
allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings 
*float64) *recordEncoder {
+func newRecordEncoder(
+       mem memory.Allocator,
+       startOffset,
+       maxDepth int64,
+       allow64b bool,
+       codec flatbuf.CompressionType,
+       compressNP int,
+       minSpaceSavings *float64,
+       compressors []compressor,
+) *recordEncoder {
        return &recordEncoder{
                mem:             mem,
                start:           startOffset,
@@ -321,6 +344,7 @@ func newRecordEncoder(mem memory.Allocator, startOffset, 
maxDepth int64, allow64
                allow64b:        allow64b,
                codec:           codec,
                compressNP:      compressNP,
+               compressors:     compressors,
                minSpaceSavings: minSpaceSavings,
        }
 }
@@ -340,6 +364,13 @@ func (w *recordEncoder) reset() {
        w.fields = make([]fieldMetadata, 0)
 }
 
+func (w *recordEncoder) getCompressor(id int) compressor {
+       if w.compressors[id] == nil {
+               w.compressors[id] = getCompressor(w.codec)
+       }
+       return w.compressors[id]
+}
+
 func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
        compress := func(idx int, codec compressor) error {
                if p.body[idx] == nil || p.body[idx].Len() == 0 {
@@ -378,7 +409,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
        }
 
        if w.compressNP <= 1 {
-               codec := getCompressor(w.codec)
+               codec := w.getCompressor(0)
                for idx := range p.body {
                        if err := compress(idx, codec); err != nil {
                                return err
@@ -395,11 +426,11 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
        )
        defer cancel()
 
-       for i := 0; i < w.compressNP; i++ {
+       for workerID := 0; workerID < w.compressNP; workerID++ {
                wg.Add(1)
-               go func() {
+               go func(id int) {
                        defer wg.Done()
-                       codec := getCompressor(w.codec)
+                       codec := w.getCompressor(id)
                        for {
                                select {
                                case idx, ok := <-ch:
@@ -418,7 +449,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
                                        return
                                }
                        }
-               }()
+               }(workerID)
        }
 
        for idx := range p.body {
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
index 5cab168f8f..4e519ed293 100644
--- a/go/arrow/ipc/writer_test.go
+++ b/go/arrow/ipc/writer_test.go
@@ -193,7 +193,8 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
        }
 
        for _, codec := range 
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME, 
flatbuf.CompressionTypeZSTD} {
-               enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil)
+               compressors := []compressor{getCompressor(codec)}
+               enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil, 
compressors)
                var payload Payload
                require.NoError(t, enc.encode(&payload, batch))
                assert.Len(t, payload.body, 2)
@@ -205,7 +206,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
                assert.Greater(t, compressedSize, int64(0))
                expectedSavings := 1.0 - 
float64(compressedSize)/float64(uncompressedSize)
 
-               compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, 
&expectedSavings)
+               compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, 
&expectedSavings, compressors)
                payload.Release()
                payload.body = payload.body[:0]
                require.NoError(t, compressEncoder.encode(&payload, batch))

Reply via email to