This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 864ba8221e GH-34385: [Go] Read IPC files with compression enabled but 
uncompressed buffers (#34476)
864ba8221e is described below

commit 864ba8221e0e852a9013d0692310c0f91b9b3b80
Author: Matt Topol <[email protected]>
AuthorDate: Tue Mar 7 12:18:36 2023 -0500

    GH-34385: [Go] Read IPC files with compression enabled but uncompressed 
buffers (#34476)
    
    
    
    ### Rationale for this change
    Fixing a bug in compressing buffers by prepending -1 when a buffer is not 
compressed due to size.
    
    ### Are these changes tested?
    Unit tests added, and other tests will be enabled via integration tests in 
#15194
    
    * Closes: #34385
    
    Authored-by: Matt Topol <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/arrow/ipc/file_writer.go | 24 +++++++------
 go/arrow/ipc/ipc.go         | 21 +++++++++++
 go/arrow/ipc/writer.go      | 87 +++++++++++++++++++++++++++++++--------------
 go/arrow/ipc/writer_test.go | 70 ++++++++++++++++++++++++++++++++++++
 4 files changed, 164 insertions(+), 38 deletions(-)

diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go
index 169cef8fd1..20c82b964d 100644
--- a/go/arrow/ipc/file_writer.go
+++ b/go/arrow/ipc/file_writer.go
@@ -274,10 +274,11 @@ type FileWriter struct {
 
        pw PayloadWriter
 
-       schema     *arrow.Schema
-       mapper     dictutils.Mapper
-       codec      flatbuf.CompressionType
-       compressNP int
+       schema          *arrow.Schema
+       mapper          dictutils.Mapper
+       codec           flatbuf.CompressionType
+       compressNP      int
+       minSpaceSavings *float64
 
        // map of the last written dictionaries by id
        // so we can avoid writing the same dictionary over and over
@@ -294,12 +295,13 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option) 
(*FileWriter, error) {
        )
 
        f := FileWriter{
-               w:          w,
-               pw:         &pwriter{w: w, schema: cfg.schema, pos: -1},
-               mem:        cfg.alloc,
-               schema:     cfg.schema,
-               codec:      cfg.codec,
-               compressNP: cfg.compressNP,
+               w:               w,
+               pw:              &pwriter{w: w, schema: cfg.schema, pos: -1},
+               mem:             cfg.alloc,
+               schema:          cfg.schema,
+               codec:           cfg.codec,
+               compressNP:      cfg.compressNP,
+               minSpaceSavings: cfg.minSpaceSavings,
        }
 
        pos, err := f.w.Seek(0, io.SeekCurrent)
@@ -343,7 +345,7 @@ func (f *FileWriter) Write(rec arrow.Record) error {
        const allow64b = true
        var (
                data = Payload{msg: MessageRecordBatch}
-               enc  = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, 
f.codec, f.compressNP)
+               enc  = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b, 
f.codec, f.compressNP, f.minSpaceSavings)
        )
        defer data.Release()
 
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index ea1d1c81a1..e651a99367 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -71,6 +71,7 @@ type config struct {
        ensureNativeEndian bool
        noAutoSchema       bool
        emitDictDeltas     bool
+       minSpaceSavings    *float64
 }
 
 func newConfig(opts ...Option) *config {
@@ -168,6 +169,26 @@ func WithDictionaryDeltas(v bool) Option {
        }
 }
 
+// WithMinSpaceSavings specifies a percentage of space savings for
+// compression to be applied to buffers.
+//
+// Space savings is calculated as (1.0 - compressedSize / uncompressedSize).
+//
+// For example, if minSpaceSavings = 0.1, a 100-byte body buffer won't
+// undergo compression if its expected compressed size exceeds 90 bytes.
+// If this option is unset, compression will be used indiscriminately. If
+// no codec was supplied, this option is ignored.
+//
+// Values outside of the range [0,1] are handled as errors.
+//
+// Note that enabling this option may result in unreadable data for Arrow
+// Go and C++ versions prior to 12.0.0.
+func WithMinSpaceSavings(savings float64) Option {
+       return func(cfg *config) {
+               cfg.minSpaceSavings = &savings
+       }
+}
+
 var (
        _ arrio.Reader = (*Reader)(nil)
        _ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 77c29319fb..93c6d8df7e 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -80,11 +80,12 @@ type Writer struct {
        mem memory.Allocator
        pw  PayloadWriter
 
-       started    bool
-       schema     *arrow.Schema
-       mapper     dictutils.Mapper
-       codec      flatbuf.CompressionType
-       compressNP int
+       started         bool
+       schema          *arrow.Schema
+       mapper          dictutils.Mapper
+       codec           flatbuf.CompressionType
+       compressNP      int
+       minSpaceSavings *float64
 
        // map of the last written dictionaries by id
        // so we can avoid writing the same dictionary over and over
@@ -98,12 +99,13 @@ type Writer struct {
 func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
        cfg := newConfig(opts...)
        return &Writer{
-               mem:            cfg.alloc,
-               pw:             pw,
-               schema:         cfg.schema,
-               codec:          cfg.codec,
-               compressNP:     cfg.compressNP,
-               emitDictDeltas: cfg.emitDictDeltas,
+               mem:             cfg.alloc,
+               pw:              pw,
+               schema:          cfg.schema,
+               codec:           cfg.codec,
+               compressNP:      cfg.compressNP,
+               minSpaceSavings: cfg.minSpaceSavings,
+               emitDictDeltas:  cfg.emitDictDeltas,
        }
 }
 
@@ -167,7 +169,7 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
        const allow64b = true
        var (
                data = Payload{msg: MessageRecordBatch}
-               enc  = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, 
w.codec, w.compressNP)
+               enc  = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b, 
w.codec, w.compressNP, w.minSpaceSavings)
        )
        defer data.Release()
 
@@ -301,24 +303,36 @@ type recordEncoder struct {
        fields []fieldMetadata
        meta   []bufferMetadata
 
-       depth      int64
-       start      int64
-       allow64b   bool
-       codec      flatbuf.CompressionType
-       compressNP int
+       depth           int64
+       start           int64
+       allow64b        bool
+       codec           flatbuf.CompressionType
+       compressNP      int
+       minSpaceSavings *float64
 }
 
-func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, 
allow64b bool, codec flatbuf.CompressionType, compressNP int) *recordEncoder {
+func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64, 
allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings 
*float64) *recordEncoder {
        return &recordEncoder{
-               mem:        mem,
-               start:      startOffset,
-               depth:      maxDepth,
-               allow64b:   allow64b,
-               codec:      codec,
-               compressNP: compressNP,
+               mem:             mem,
+               start:           startOffset,
+               depth:           maxDepth,
+               allow64b:        allow64b,
+               codec:           codec,
+               compressNP:      compressNP,
+               minSpaceSavings: minSpaceSavings,
        }
 }
 
+func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool {
+       debug.Assert(uncompressed > 0, "uncompressed size is 0")
+       if w.minSpaceSavings == nil {
+               return true
+       }
+
+       savings := 1.0 - float64(compressed)/float64(uncompressed)
+       return savings >= *w.minSpaceSavings
+}
+
 func (w *recordEncoder) reset() {
        w.start = 0
        w.fields = make([]fieldMetadata, 0)
@@ -336,14 +350,26 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
                binary.LittleEndian.PutUint64(buf.Buf(), 
uint64(p.body[idx].Len()))
                bw := &bufferWriter{buf: buf, pos: arrow.Int64SizeBytes}
                codec.Reset(bw)
-               if _, err := codec.Write(p.body[idx].Bytes()); err != nil {
+
+               n, err := codec.Write(p.body[idx].Bytes())
+               if err != nil {
                        return err
                }
                if err := codec.Close(); err != nil {
                        return err
                }
 
-               buf.Resize(bw.pos)
+               finalLen := bw.pos
+               compressedLen := bw.pos - arrow.Int64SizeBytes
+               if !w.shouldCompress(n, compressedLen) {
+                       n = copy(buf.Buf()[arrow.Int64SizeBytes:], 
p.body[idx].Bytes())
+                       // size of -1 indicates to the reader that the body
+                       // doesn't need to be decompressed
+                       var noprefix int64 = -1
+                       binary.LittleEndian.PutUint64(buf.Buf(), 
uint64(noprefix))
+                       finalLen = n + arrow.Int64SizeBytes
+               }
+               bw.buf.Resize(finalLen)
                p.body[idx].Release()
                p.body[idx] = buf
                return nil
@@ -405,7 +431,6 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload) 
error {
 }
 
 func (w *recordEncoder) encode(p *Payload, rec arrow.Record) error {
-
        // perform depth-first traversal of the row-batch
        for i, col := range rec.Columns() {
                err := w.visit(p, col)
@@ -415,6 +440,14 @@ func (w *recordEncoder) encode(p *Payload, rec 
arrow.Record) error {
        }
 
        if w.codec != -1 {
+               if w.minSpaceSavings != nil {
+                       pct := *w.minSpaceSavings
+                       if pct < 0 || pct > 1 {
+                               p.Release()
+                               return fmt.Errorf("%w: minSpaceSavings not in 
range [0,1]. Provided %.05f",
+                                       arrow.ErrInvalid, pct)
+                       }
+               }
                w.compressBodyBuffers(p)
        }
 
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
index 1cbd440dfd..d494b7af2a 100644
--- a/go/arrow/ipc/writer_test.go
+++ b/go/arrow/ipc/writer_test.go
@@ -18,12 +18,16 @@ package ipc
 
 import (
        "bytes"
+       "encoding/binary"
        "fmt"
+       "math"
+       "strings"
        "testing"
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/bitutil"
+       "github.com/apache/arrow/go/v12/arrow/internal/flatbuf"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -166,3 +170,69 @@ func TestWriterMemCompression(t *testing.T) {
 
        require.NoError(t, w.Write(rec))
 }
+
+func TestWriteWithCompressionAndMinSavings(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       // a small batch that is known to be compressible
+       batch, _, err := array.RecordFromJSON(mem, 
arrow.NewSchema([]arrow.Field{
+               {Name: "n", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, 
nil),
+               strings.NewReader(`[
+                       {"n": 0}, {"n": 1}, {"n": 2}, {"n": 3}, {"n": 4},
+                       {"n": 5}, {"n": 6}, {"n": 7}, {"n": 8}, {"n": 9}]`))
+       require.NoError(t, err)
+       defer batch.Release()
+
+       prefixedSize := func(buf *memory.Buffer) int64 {
+               if buf.Len() < arrow.Int64SizeBytes {
+                       return 0
+               }
+               return int64(binary.LittleEndian.Uint64(buf.Bytes()))
+       }
+       contentSize := func(buf *memory.Buffer) int64 {
+               return int64(buf.Len()) - int64(arrow.Int64SizeBytes)
+       }
+
+       for _, codec := range 
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME, 
flatbuf.CompressionTypeZSTD} {
+               enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil)
+               var payload Payload
+               require.NoError(t, enc.encode(&payload, batch))
+               assert.Len(t, payload.body, 2)
+
+               // compute the savings when body buffers are compressed 
unconditionally.
+               // We also validate that our test batch is indeed compressible.
+               uncompressedSize, compressedSize := 
prefixedSize(payload.body[1]), contentSize(payload.body[1])
+               assert.Less(t, compressedSize, uncompressedSize)
+               assert.Greater(t, compressedSize, int64(0))
+               expectedSavings := 1.0 - 
float64(compressedSize)/float64(uncompressedSize)
+
+               compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1, 
&expectedSavings)
+               payload.Release()
+               payload.body = payload.body[:0]
+               require.NoError(t, compressEncoder.encode(&payload, batch))
+               assert.Len(t, payload.body, 2)
+               assert.Equal(t, uncompressedSize, prefixedSize(payload.body[1]))
+               assert.Equal(t, compressedSize, contentSize(payload.body[1]))
+
+               payload.Release()
+               payload.body = payload.body[:0]
+               // slightly bump the threshold. the body buffer should now be 
prefixed
+               // with -1 and its content left uncompressed
+               minSavings := math.Nextafter(expectedSavings, 1.0)
+               compressEncoder.minSpaceSavings = &minSavings
+               require.NoError(t, compressEncoder.encode(&payload, batch))
+               assert.Len(t, payload.body, 2)
+               assert.EqualValues(t, -1, prefixedSize(payload.body[1]))
+               assert.Equal(t, uncompressedSize, contentSize(payload.body[1]))
+               payload.Release()
+               payload.body = payload.body[:0]
+
+               for _, outOfRange := range []float64{math.Nextafter(1.0, 2.0), 
math.Nextafter(0, -1)} {
+                       compressEncoder.minSpaceSavings = &outOfRange
+                       err := compressEncoder.encode(&payload, batch)
+                       assert.ErrorIs(t, err, arrow.ErrInvalid)
+                       assert.ErrorContains(t, err, "minSpaceSavings not in 
range [0,1]")
+               }
+       }
+}

Reply via email to