This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 864ba8221e GH-34385: [Go] Read IPC files with compression enabled but
uncompressed buffers (#34476)
864ba8221e is described below
commit 864ba8221e0e852a9013d0692310c0f91b9b3b80
Author: Matt Topol <[email protected]>
AuthorDate: Tue Mar 7 12:18:36 2023 -0500
GH-34385: [Go] Read IPC files with compression enabled but uncompressed
buffers (#34476)
### Rationale for this change
Fixing a bug in compressing buffers by prepending -1 when a buffer is not
compressed due to size.
### Are these changes tested?
Unit tests added, and other tests will be enabled via integration tests in
#15194
* Closes: #34385
Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/ipc/file_writer.go | 24 +++++++------
go/arrow/ipc/ipc.go | 21 +++++++++++
go/arrow/ipc/writer.go | 87 +++++++++++++++++++++++++++++++--------------
go/arrow/ipc/writer_test.go | 70 ++++++++++++++++++++++++++++++++++++
4 files changed, 164 insertions(+), 38 deletions(-)
diff --git a/go/arrow/ipc/file_writer.go b/go/arrow/ipc/file_writer.go
index 169cef8fd1..20c82b964d 100644
--- a/go/arrow/ipc/file_writer.go
+++ b/go/arrow/ipc/file_writer.go
@@ -274,10 +274,11 @@ type FileWriter struct {
pw PayloadWriter
- schema *arrow.Schema
- mapper dictutils.Mapper
- codec flatbuf.CompressionType
- compressNP int
+ schema *arrow.Schema
+ mapper dictutils.Mapper
+ codec flatbuf.CompressionType
+ compressNP int
+ minSpaceSavings *float64
// map of the last written dictionaries by id
// so we can avoid writing the same dictionary over and over
@@ -294,12 +295,13 @@ func NewFileWriter(w io.WriteSeeker, opts ...Option)
(*FileWriter, error) {
)
f := FileWriter{
- w: w,
- pw: &pwriter{w: w, schema: cfg.schema, pos: -1},
- mem: cfg.alloc,
- schema: cfg.schema,
- codec: cfg.codec,
- compressNP: cfg.compressNP,
+ w: w,
+ pw: &pwriter{w: w, schema: cfg.schema, pos: -1},
+ mem: cfg.alloc,
+ schema: cfg.schema,
+ codec: cfg.codec,
+ compressNP: cfg.compressNP,
+ minSpaceSavings: cfg.minSpaceSavings,
}
pos, err := f.w.Seek(0, io.SeekCurrent)
@@ -343,7 +345,7 @@ func (f *FileWriter) Write(rec arrow.Record) error {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
- enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b,
f.codec, f.compressNP)
+ enc = newRecordEncoder(f.mem, 0, kMaxNestingDepth, allow64b,
f.codec, f.compressNP, f.minSpaceSavings)
)
defer data.Release()
diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go
index ea1d1c81a1..e651a99367 100644
--- a/go/arrow/ipc/ipc.go
+++ b/go/arrow/ipc/ipc.go
@@ -71,6 +71,7 @@ type config struct {
ensureNativeEndian bool
noAutoSchema bool
emitDictDeltas bool
+ minSpaceSavings *float64
}
func newConfig(opts ...Option) *config {
@@ -168,6 +169,26 @@ func WithDictionaryDeltas(v bool) Option {
}
}
+// WithMinSpaceSavings specifies a percentage of space savings for
+// compression to be applied to buffers.
+//
+// Space savings is calculated as (1.0 - compressedSize / uncompressedSize).
+//
+// For example, if minSpaceSavings = 0.1, a 100-byte body buffer won't
+// undergo compression if its expected compressed size exceeds 90 bytes.
+// If this option is unset, compression will be used indiscriminately. If
+// no codec was supplied, this option is ignored.
+//
+// Values outside of the range [0,1] are handled as errors.
+//
+// Note that enabling this option may result in unreadable data for Arrow
+// Go and C++ versions prior to 12.0.0.
+func WithMinSpaceSavings(savings float64) Option {
+ return func(cfg *config) {
+ cfg.minSpaceSavings = &savings
+ }
+}
+
var (
_ arrio.Reader = (*Reader)(nil)
_ arrio.Writer = (*Writer)(nil)
diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go
index 77c29319fb..93c6d8df7e 100644
--- a/go/arrow/ipc/writer.go
+++ b/go/arrow/ipc/writer.go
@@ -80,11 +80,12 @@ type Writer struct {
mem memory.Allocator
pw PayloadWriter
- started bool
- schema *arrow.Schema
- mapper dictutils.Mapper
- codec flatbuf.CompressionType
- compressNP int
+ started bool
+ schema *arrow.Schema
+ mapper dictutils.Mapper
+ codec flatbuf.CompressionType
+ compressNP int
+ minSpaceSavings *float64
// map of the last written dictionaries by id
// so we can avoid writing the same dictionary over and over
@@ -98,12 +99,13 @@ type Writer struct {
func NewWriterWithPayloadWriter(pw PayloadWriter, opts ...Option) *Writer {
cfg := newConfig(opts...)
return &Writer{
- mem: cfg.alloc,
- pw: pw,
- schema: cfg.schema,
- codec: cfg.codec,
- compressNP: cfg.compressNP,
- emitDictDeltas: cfg.emitDictDeltas,
+ mem: cfg.alloc,
+ pw: pw,
+ schema: cfg.schema,
+ codec: cfg.codec,
+ compressNP: cfg.compressNP,
+ minSpaceSavings: cfg.minSpaceSavings,
+ emitDictDeltas: cfg.emitDictDeltas,
}
}
@@ -167,7 +169,7 @@ func (w *Writer) Write(rec arrow.Record) (err error) {
const allow64b = true
var (
data = Payload{msg: MessageRecordBatch}
- enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b,
w.codec, w.compressNP)
+ enc = newRecordEncoder(w.mem, 0, kMaxNestingDepth, allow64b,
w.codec, w.compressNP, w.minSpaceSavings)
)
defer data.Release()
@@ -301,24 +303,36 @@ type recordEncoder struct {
fields []fieldMetadata
meta []bufferMetadata
- depth int64
- start int64
- allow64b bool
- codec flatbuf.CompressionType
- compressNP int
+ depth int64
+ start int64
+ allow64b bool
+ codec flatbuf.CompressionType
+ compressNP int
+ minSpaceSavings *float64
}
-func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64,
allow64b bool, codec flatbuf.CompressionType, compressNP int) *recordEncoder {
+func newRecordEncoder(mem memory.Allocator, startOffset, maxDepth int64,
allow64b bool, codec flatbuf.CompressionType, compressNP int, minSpaceSavings
*float64) *recordEncoder {
return &recordEncoder{
- mem: mem,
- start: startOffset,
- depth: maxDepth,
- allow64b: allow64b,
- codec: codec,
- compressNP: compressNP,
+ mem: mem,
+ start: startOffset,
+ depth: maxDepth,
+ allow64b: allow64b,
+ codec: codec,
+ compressNP: compressNP,
+ minSpaceSavings: minSpaceSavings,
}
}
+func (w *recordEncoder) shouldCompress(uncompressed, compressed int) bool {
+ debug.Assert(uncompressed > 0, "uncompressed size is 0")
+ if w.minSpaceSavings == nil {
+ return true
+ }
+
+ savings := 1.0 - float64(compressed)/float64(uncompressed)
+ return savings >= *w.minSpaceSavings
+}
+
func (w *recordEncoder) reset() {
w.start = 0
w.fields = make([]fieldMetadata, 0)
@@ -336,14 +350,26 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload)
error {
binary.LittleEndian.PutUint64(buf.Buf(),
uint64(p.body[idx].Len()))
bw := &bufferWriter{buf: buf, pos: arrow.Int64SizeBytes}
codec.Reset(bw)
- if _, err := codec.Write(p.body[idx].Bytes()); err != nil {
+
+ n, err := codec.Write(p.body[idx].Bytes())
+ if err != nil {
return err
}
if err := codec.Close(); err != nil {
return err
}
- buf.Resize(bw.pos)
+ finalLen := bw.pos
+ compressedLen := bw.pos - arrow.Int64SizeBytes
+ if !w.shouldCompress(n, compressedLen) {
+ n = copy(buf.Buf()[arrow.Int64SizeBytes:],
p.body[idx].Bytes())
+ // size of -1 indicates to the reader that the body
+ // doesn't need to be decompressed
+ var noprefix int64 = -1
+ binary.LittleEndian.PutUint64(buf.Buf(),
uint64(noprefix))
+ finalLen = n + arrow.Int64SizeBytes
+ }
+ bw.buf.Resize(finalLen)
p.body[idx].Release()
p.body[idx] = buf
return nil
@@ -405,7 +431,6 @@ func (w *recordEncoder) compressBodyBuffers(p *Payload)
error {
}
func (w *recordEncoder) encode(p *Payload, rec arrow.Record) error {
-
// perform depth-first traversal of the row-batch
for i, col := range rec.Columns() {
err := w.visit(p, col)
@@ -415,6 +440,14 @@ func (w *recordEncoder) encode(p *Payload, rec
arrow.Record) error {
}
if w.codec != -1 {
+ if w.minSpaceSavings != nil {
+ pct := *w.minSpaceSavings
+ if pct < 0 || pct > 1 {
+ p.Release()
+ return fmt.Errorf("%w: minSpaceSavings not in
range [0,1]. Provided %.05f",
+ arrow.ErrInvalid, pct)
+ }
+ }
w.compressBodyBuffers(p)
}
diff --git a/go/arrow/ipc/writer_test.go b/go/arrow/ipc/writer_test.go
index 1cbd440dfd..d494b7af2a 100644
--- a/go/arrow/ipc/writer_test.go
+++ b/go/arrow/ipc/writer_test.go
@@ -18,12 +18,16 @@ package ipc
import (
"bytes"
+ "encoding/binary"
"fmt"
+ "math"
+ "strings"
"testing"
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/bitutil"
+ "github.com/apache/arrow/go/v12/arrow/internal/flatbuf"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -166,3 +170,69 @@ func TestWriterMemCompression(t *testing.T) {
require.NoError(t, w.Write(rec))
}
+
+func TestWriteWithCompressionAndMinSavings(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ // a small batch that is known to be compressible
+ batch, _, err := array.RecordFromJSON(mem,
arrow.NewSchema([]arrow.Field{
+ {Name: "n", Type: arrow.PrimitiveTypes.Int64, Nullable: true}},
nil),
+ strings.NewReader(`[
+ {"n": 0}, {"n": 1}, {"n": 2}, {"n": 3}, {"n": 4},
+ {"n": 5}, {"n": 6}, {"n": 7}, {"n": 8}, {"n": 9}]`))
+ require.NoError(t, err)
+ defer batch.Release()
+
+ prefixedSize := func(buf *memory.Buffer) int64 {
+ if buf.Len() < arrow.Int64SizeBytes {
+ return 0
+ }
+ return int64(binary.LittleEndian.Uint64(buf.Bytes()))
+ }
+ contentSize := func(buf *memory.Buffer) int64 {
+ return int64(buf.Len()) - int64(arrow.Int64SizeBytes)
+ }
+
+ for _, codec := range
[]flatbuf.CompressionType{flatbuf.CompressionTypeLZ4_FRAME,
flatbuf.CompressionTypeZSTD} {
+ enc := newRecordEncoder(mem, 0, 5, true, codec, 1, nil)
+ var payload Payload
+ require.NoError(t, enc.encode(&payload, batch))
+ assert.Len(t, payload.body, 2)
+
+ // compute the savings when body buffers are compressed
unconditionally.
+ // We also validate that our test batch is indeed compressible.
+ uncompressedSize, compressedSize :=
prefixedSize(payload.body[1]), contentSize(payload.body[1])
+ assert.Less(t, compressedSize, uncompressedSize)
+ assert.Greater(t, compressedSize, int64(0))
+ expectedSavings := 1.0 -
float64(compressedSize)/float64(uncompressedSize)
+
+ compressEncoder := newRecordEncoder(mem, 0, 5, true, codec, 1,
&expectedSavings)
+ payload.Release()
+ payload.body = payload.body[:0]
+ require.NoError(t, compressEncoder.encode(&payload, batch))
+ assert.Len(t, payload.body, 2)
+ assert.Equal(t, uncompressedSize, prefixedSize(payload.body[1]))
+ assert.Equal(t, compressedSize, contentSize(payload.body[1]))
+
+ payload.Release()
+ payload.body = payload.body[:0]
+ // slightly bump the threshold. the body buffer should now be
prefixed
+ // with -1 and its content left uncompressed
+ minSavings := math.Nextafter(expectedSavings, 1.0)
+ compressEncoder.minSpaceSavings = &minSavings
+ require.NoError(t, compressEncoder.encode(&payload, batch))
+ assert.Len(t, payload.body, 2)
+ assert.EqualValues(t, -1, prefixedSize(payload.body[1]))
+ assert.Equal(t, uncompressedSize, contentSize(payload.body[1]))
+ payload.Release()
+ payload.body = payload.body[:0]
+
+ for _, outOfRange := range []float64{math.Nextafter(1.0, 2.0),
math.Nextafter(0, -1)} {
+ compressEncoder.minSpaceSavings = &outOfRange
+ err := compressEncoder.encode(&payload, batch)
+ assert.ErrorIs(t, err, arrow.ErrInvalid)
+ assert.ErrorContains(t, err, "minSpaceSavings not in
range [0,1]")
+ }
+ }
+}