This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new b0d8d82de6 GH-43015: [Go] Reuse column compressors in IPC module
(#43016)
b0d8d82de6 is described below
commit b0d8d82de65c55a2ec975ed39f01e3f0d5f9e7ec
Author: Wyatt Alt <[email protected]>
AuthorDate: Tue Jun 25 12:27:35 2024 -0700
GH-43015: [Go] Reuse column compressors in IPC module (#43016)
This changes the IPC writer to reuse lz4 or zstd compressors instead of
instantiating new ones on each call to Write. Prior to this commit we
would instantiate as many encoders per Write call as the configured
number of concurrent compression workers, even though the compressors
are reusable across calls.
### Are these changes tested?
I have added no tests and have not yet closely verified the patch.
There's no functional change to test and I want to get agreement on the
approach before solidifying it. I have not tested the lz4 change at all.
* GitHub Issue: #43015
---
go/arrow/ipc/file_writer.go | 6 +++++-
go/arrow/ipc/ipc.go | 6 +++++-
go/arrow/ipc/writer.go | 45 ++++++++++++++++++++++++++++++++++++++-------
go/arrow/ipc/writer_test.go | 5 +++--
4 files changed, 51 insertions(+), 11 deletions(-)
diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go
index cf795fa57d..8cea458192 100644
--- a/go/arrow/ipc/file_writer.go
+++ b/go/arrow/ipc/file_writer.go
@@ -278,6 +278,7 @@ type FileWriter struct {
mapper dictutils.Mapper
codec flatbuf.CompressionType
compressNP int
+ compressors []compressor
minSpaceSavings *float64
// map of the last written dictionaries by id
@@ -302,6 +303,7 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option)
(*FileWriter, error) {
codec: cfg.codec,
compressNP: cfg.compressNP,
minSpaceSavings: cfg.minSpaceSavings,
+ compressors: make([]compressor, cfg.compressNP),
}
pos, err := f.w.Seek(0, io.SeekCurrent)
@@ -345,7 +347,9 @@ func (f *FileWriter) Write(rec arrow.Record) error {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
- enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b,
f.codec, f.compressNP, f.minSpaceSavings)
+ enc = newRecordEncoder(
+ f.mem, 0, kMaxNestingDepth, allow64b, f.codec,
f.compressNP, f.minSpaceSavings, f.compressors,
+ )
)
defer data.Release()
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index 2398c8f7e7..b31a358a8a 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -79,6 +79,7 @@ func newConfig(opts ...Option) *config {
alloc: memory.NewGoAllocator(),
codec: -1, // uncompressed
ensureNativeEndian: true,
+ compressNP: 1,
}
for _, opt := range opts {
@@ -132,9 +133,12 @@ func WithZstd() Option {
// WithCompressConcurrency specifies a number of goroutines to spin up for
// concurrent compression of the body buffers when writing compress IPC
records.
// If n <= 1 then compression will be done serially without goroutine
-// parallelization. Default is 0.
+// parallelization. Default is 1.
func WithCompressConcurrency(n int) Option {
return func(cfg *config) {
+ if n <= 0 {
+ n = 1
+ }
cfg.compressNP = n
}
}
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index dfca2b9103..f2afd2db42 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -86,6 +86,7 @@ type Writer struct {
mapper dictutils.Mapper
codec flatbuf.CompressionType
compressNP int
+ compressors []compressor
minSpaceSavings *float64
// map of the last written dictionaries by id
@@ -107,6 +108,7 @@ func NewWriterWithPayloadWriter(pw PayloadWriter, opts
...Option) *Writer {
compressNP: cfg.compressNP,
minSpaceSavings: cfg.minSpaceSavings,
emitDictDeltas: cfg.emitDictDeltas,
+ compressors: make([]compressor, cfg.compressNP),
}
}
@@ -120,6 +122,8 @@ func NewWriter(w io.Writer, opts ...Option) *Writer {
schema: cfg.schema,
codec: cfg.codec,
emitDictDeltas: cfg.emitDictDeltas,
+ compressNP: cfg.compressNP,
+ compressors: make([]compressor, cfg.compressNP),
}
}
@@ -170,7 +174,16 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
- enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b,
w.codec, w.compressNP, w.minSpaceSavings)
+ enc = newRecordEncoder(
+ w.mem,
+ 0,
+ kMaxNestingDepth,
+ allow64b,
+ w.codec,
+ w.compressNP,
+ w.minSpaceSavings,
+ w.compressors,
+ )
)
defer data.Release()
@@ -310,10 +323,20 @@ type recordEncoder struct {
allow64b bool
codec flatbuf.CompressionType
compressNP int
+ compressors []compressor
minSpaceSavings *float64
}
-func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64,
allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings
*float64) *recordEncoder {
+func newRecordEncoder(
+ mem memory.Allocator,
+ startOffset,
+ maxDepth int64,
+ allow64b bool,
+ codec flatbuf.CompressionType,
+ compressNP int,
+ minSpaceSavings *float64,
+ compressors []compressor,
+) *recordEncoder {
return &recordEncoder{
mem: mem,
start: startOffset,
@@ -321,6 +344,7 @@ func newRecordEncoder(mem memory.Allocator, startOffset,
maxDepth int64, allow64
allow64b: allow64b,
codec: codec,
compressNP: compressNP,
+ compressors: compressors,
minSpaceSavings: minSpaceSavings,
}
}
@@ -340,6 +364,13 @@ func (w *recordEncoder) reset() {
w.fields = make([]fieldMetadata, 0)
}
+func (w *recordEncoder) getCompressor(id int) compressor {
+ if w.compressors[id] == nil {
+ w.compressors[id] = getCompressor(w.codec)
+ }
+ return w.compressors[id]
+}
+
func (w *recordEncoder) compressBodyBuffers(p *Payload) error {
compress := func(idx int, codec compressor) error {
if p.body[idx] == nil || p.body[idx].Len() == 0 {
@@ -378,7 +409,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload)
error {
}
if w.compressNP <= 1 {
- codec := getCompressor(w.codec)
+ codec := w.getCompressor(0)
for idx := range p.body {
if err := compress(idx, codec); err != nil {
return err
@@ -395,11 +426,11 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload)
error {
)
defer cancel()
- for i := 0; i < w.compressNP; i++ {
+ for workerID := 0; workerID < w.compressNP; workerID++ {
wg.Add(1)
- go func() {
+ go func(id int) {
defer wg.Done()
- codec := getCompressor(w.codec)
+ codec := w.getCompressor(id)
for {
select {
case idx, ok := <-ch:
@@ -418,7 +449,7 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload)
error {
return
}
}
- }()
+ }(workerID)
}
for idx := range p.body {
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
index 5cab168f8f..4e519ed293 100644
--- a/go/arrow/ipc/writer_test.go
+++ b/go/arrow/ipc/writer_test.go
@@ -193,7 +193,8 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
}
for _, codec := range
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME,
flatbuf.CompressionTypeZSTD} {
- enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil)
+ compressors := []compressor{getCompressor(codec)}
+ enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil,
compressors)
var payload Payload
require.NoError(t, enc.encode(&payload, batch))
assert.Len(t, payload.body, 2)
@@ -205,7 +206,7 @@ func TestWriteWithCompressionAndMinSavings(t *testing.T) {
assert.Greater(t, compressedSize, int64(0))
expectedSavings := 1.0 -
float64(compressedSize)/float64(uncompressedSize)
- compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1,
&expectedSavings)
+ compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1,
&expectedSavings, compressors)
payload.Release()
payload.body = payload.body[:0]
require.NoError(t, compressEncoder.encode(&payload, batch))