This is an automated email from the ASF dual-hosted git repository.
joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 6502f0e3ad GH-43790: [Go][Parquet] Add support for LZ4_RAW compression
codec (#43835)
6502f0e3ad is described below
commit 6502f0e3ad046d361aba44385ab3379ed7af5b7f
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Tue Aug 27 13:17:39 2024 -0400
GH-43790: [Go][Parquet] Add support for LZ4_RAW compression codec (#43835)
### Rationale for this change
Fixes: #43790
The LZ4 compression codec for Parquet is no longer ambiguous, as it has
been superceded by the
[LZ4_RAW](https://github.com/apache/parquet-format/blob/master/Compression.md#lz4_raw)
spec.
### What changes are included in this PR?
- Add `LZ4Raw` compression codec
- Split out `StreamingCodec` methods from core `Codec` interface
- Various conformance/roundtrip tests
- Set of benchmarks for reading/writing an Arrow table to/from Parquet,
using each compression codec
### Are these changes tested?
Yes
### Are there any user-facing changes?
- New codec `LZ4Raw` is available
- `Codec` interface no long provides the following methods, which are now
part of `StreamingCodec`:
- `NewReader`
- `NewWriter`
- `NewWriterLevel`
* GitHub Issue: #43790
Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Joel Lubinitsky <[email protected]>
---
go/parquet/compress/compress.go | 22 ++++--
go/parquet/compress/compress_test.go | 8 +-
go/parquet/compress/lz4_raw.go | 66 ++++++++++++++++
go/parquet/file/file_reader_test.go | 127 +++++++++++++++++++++++++++++++
go/parquet/file/file_writer_test.go | 58 +++++++++++++-
go/parquet/pqarrow/reader_writer_test.go | 111 +++++++++++++++++++++++++++
6 files changed, 380 insertions(+), 12 deletions(-)
diff --git a/go/parquet/compress/compress.go b/go/parquet/compress/compress.go
index b6a1349133..92f2ae99bb 100644
--- a/go/parquet/compress/compress.go
+++ b/go/parquet/compress/compress.go
@@ -49,8 +49,9 @@ var Codecs = struct {
Brotli Compression
// LZ4 unsupported in this library due to problematic issues between
the Hadoop LZ4 spec vs regular lz4
// see:
http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E
- Lz4 Compression
- Zstd Compression
+ Lz4 Compression
+ Zstd Compression
+ Lz4Raw Compression
}{
Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
Snappy: Compression(parquet.CompressionCodec_SNAPPY),
@@ -59,17 +60,12 @@ var Codecs = struct {
Brotli: Compression(parquet.CompressionCodec_BROTLI),
Lz4: Compression(parquet.CompressionCodec_LZ4),
Zstd: Compression(parquet.CompressionCodec_ZSTD),
+ Lz4Raw: Compression(parquet.CompressionCodec_LZ4_RAW),
}
// Codec is an interface which is implemented for each compression type in
order to make the interactions easy to
// implement. Most consumers won't be calling GetCodec directly.
type Codec interface {
- // NewReader provides a reader that wraps a stream with compressed data
to stream the uncompressed data
- NewReader(io.Reader) io.ReadCloser
- // NewWriter provides a wrapper around a write stream to compress data
before writing it.
- NewWriter(io.Writer) io.WriteCloser
- // NewWriterLevel is like NewWriter but allows specifying the
compression level
- NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
// Encode encodes a block of data given by src and returns the
compressed block. dst should be either nil
// or sized large enough to fit the compressed block (use CompressBound
to allocate). dst and src should not
// overlap since some of the compression types don't allow it.
@@ -90,6 +86,16 @@ type Codec interface {
Decode(dst, src []byte) []byte
}
+// StreamingCodec is an interface that may be implemented for compression
codecs that expose a streaming API.
+type StreamingCodec interface {
+ // NewReader provides a reader that wraps a stream with compressed data
to stream the uncompressed data
+ NewReader(io.Reader) io.ReadCloser
+ // NewWriter provides a wrapper around a write stream to compress data
before writing it.
+ NewWriter(io.Writer) io.WriteCloser
+ // NewWriterLevel is like NewWriter but allows specifying the
compression level
+ NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
+}
+
var codecs = map[Compression]Codec{}
// RegisterCodec adds or overrides a codec implementation for a given
compression algorithm.
diff --git a/go/parquet/compress/compress_test.go
b/go/parquet/compress/compress_test.go
index 843062c0d0..5aac74759e 100644
--- a/go/parquet/compress/compress_test.go
+++ b/go/parquet/compress/compress_test.go
@@ -66,8 +66,8 @@ func TestCompressDataOneShot(t *testing.T) {
{compress.Codecs.Gzip},
{compress.Codecs.Brotli},
{compress.Codecs.Zstd},
+ {compress.Codecs.Lz4Raw},
// {compress.Codecs.Lzo},
- // {compress.Codecs.Lz4},
}
for _, tt := range tests {
@@ -107,9 +107,11 @@ func TestCompressReaderWriter(t *testing.T) {
var buf bytes.Buffer
codec, err := compress.GetCodec(tt.c)
assert.NoError(t, err)
+ streamingCodec, ok := codec.(compress.StreamingCodec)
+ assert.True(t, ok)
data := makeRandomData(RandomDataSize)
- wr := codec.NewWriter(&buf)
+ wr := streamingCodec.NewWriter(&buf)
const chunkSize = 1111
input := data
@@ -129,7 +131,7 @@ func TestCompressReaderWriter(t *testing.T) {
}
wr.Close()
- rdr := codec.NewReader(&buf)
+ rdr := streamingCodec.NewReader(&buf)
out, err := io.ReadAll(rdr)
assert.NoError(t, err)
assert.Exactly(t, data, out)
diff --git a/go/parquet/compress/lz4_raw.go b/go/parquet/compress/lz4_raw.go
new file mode 100644
index 0000000000..788d9520a6
--- /dev/null
+++ b/go/parquet/compress/lz4_raw.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+ "sync"
+
+ "github.com/pierrec/lz4/v4"
+)
+
+// lz4.Compressor is not goroutine-safe, so we use a pool to amortize the cost
+// of allocating a new one for each call to Encode().
+var compressorPool = sync.Pool{New: func() interface{} { return
new(lz4.Compressor) }}
+
+func compressBlock(src, dst []byte) (int, error) {
+ c := compressorPool.Get().(*lz4.Compressor)
+ defer compressorPool.Put(c)
+ return c.CompressBlock(src, dst)
+}
+
+type lz4RawCodec struct{}
+
+func (c lz4RawCodec) Encode(dst, src []byte) []byte {
+ n, err := compressBlock(src, dst[:cap(dst)])
+ if err != nil {
+ panic(err)
+ }
+
+ return dst[:n]
+}
+
+func (c lz4RawCodec) EncodeLevel(dst, src []byte, _ int) []byte {
+ // the lz4 block implementation does not allow level to be set
+ return c.Encode(dst, src)
+}
+
+func (lz4RawCodec) Decode(dst, src []byte) []byte {
+ n, err := lz4.UncompressBlock(src, dst)
+ if err != nil {
+ panic(err)
+ }
+
+ return dst[:n]
+}
+
+func (c lz4RawCodec) CompressBound(len int64) int64 {
+ return int64(lz4.CompressBlockBound(int(len)))
+}
+
+func init() {
+ RegisterCodec(Codecs.Lz4Raw, lz4RawCodec{})
+}
diff --git a/go/parquet/file/file_reader_test.go
b/go/parquet/file/file_reader_test.go
index 547ec475c2..35f4da4e86 100644
--- a/go/parquet/file/file_reader_test.go
+++ b/go/parquet/file/file_reader_test.go
@@ -644,3 +644,130 @@ func TestDeltaBinaryPackedMultipleBatches(t *testing.T) {
require.Equalf(t, size, totalRows, "Expected %d rows, but got %d rows",
size, totalRows)
}
+
+// Test read file lz4_raw_compressed.parquet
+// Contents documented at
https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
+func TestLZ4RawFileRead(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ t.Skip("no path supplied with PARQUET_TEST_DATA")
+ }
+ require.DirExists(t, dir)
+
+ props := parquet.NewReaderProperties(memory.DefaultAllocator)
+ fileReader, err := file.OpenParquetFile(path.Join(dir,
"lz4_raw_compressed.parquet"),
+ false, file.WithReadProps(props))
+ require.NoError(t, err)
+ defer fileReader.Close()
+
+ nRows := 4
+ nCols := 3
+ require.Equal(t, 1, fileReader.NumRowGroups())
+ rgr := fileReader.RowGroup(0)
+ require.EqualValues(t, nRows, rgr.NumRows())
+ require.EqualValues(t, nCols, rgr.NumColumns())
+
+ rdr, err := rgr.Column(0)
+ require.NoError(t, err)
+
+ rowsInt64, ok := rdr.(*file.Int64ColumnChunkReader)
+ require.True(t, ok)
+
+ valsInt64 := make([]int64, nRows)
+ total, read, err := rowsInt64.ReadBatch(int64(nRows), valsInt64, nil,
nil)
+ require.NoError(t, err)
+ require.Equal(t, int64(nRows), total)
+ require.Equal(t, nRows, read)
+
+ expectedValsInt64 := []int64{
+ 1593604800,
+ 1593604800,
+ 1593604801,
+ 1593604801,
+ }
+ require.Equal(t, expectedValsInt64, valsInt64)
+
+ rdr, err = rgr.Column(1)
+ require.NoError(t, err)
+
+ rowsByteArray, ok := rdr.(*file.ByteArrayColumnChunkReader)
+ require.True(t, ok)
+
+ valsByteArray := make([]parquet.ByteArray, nRows)
+ total, read, err = rowsByteArray.ReadBatch(int64(nRows), valsByteArray,
nil, nil)
+ require.NoError(t, err)
+ require.Equal(t, int64(nRows), total)
+ require.Equal(t, nRows, read)
+
+ expectedValsByteArray := []parquet.ByteArray{
+ []byte("abc"),
+ []byte("def"),
+ []byte("abc"),
+ []byte("def"),
+ }
+ require.Equal(t, expectedValsByteArray, valsByteArray)
+
+ rdr, err = rgr.Column(2)
+ require.NoError(t, err)
+
+ rowsFloat64, ok := rdr.(*file.Float64ColumnChunkReader)
+ require.True(t, ok)
+
+ valsFloat64 := make([]float64, nRows)
+ total, read, err = rowsFloat64.ReadBatch(int64(nRows), valsFloat64,
nil, nil)
+ require.NoError(t, err)
+ require.Equal(t, int64(nRows), total)
+ require.Equal(t, nRows, read)
+
+ expectedValsFloat64 := []float64{
+ 42.0,
+ 7.7,
+ 42.125,
+ 7.7,
+ }
+ require.Equal(t, expectedValsFloat64, valsFloat64)
+}
+
+// Test read file lz4_raw_compressed_larger.parquet
+// Contents documented at
https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
+func TestLZ4RawLargerFileRead(t *testing.T) {
+ dir := os.Getenv("PARQUET_TEST_DATA")
+ if dir == "" {
+ t.Skip("no path supplied with PARQUET_TEST_DATA")
+ }
+ require.DirExists(t, dir)
+
+ props := parquet.NewReaderProperties(memory.DefaultAllocator)
+ fileReader, err := file.OpenParquetFile(path.Join(dir,
"lz4_raw_compressed_larger.parquet"),
+ false, file.WithReadProps(props))
+ require.NoError(t, err)
+ defer fileReader.Close()
+
+ nRows := 10000
+ nCols := 1
+ require.Equal(t, 1, fileReader.NumRowGroups())
+ rgr := fileReader.RowGroup(0)
+ require.EqualValues(t, nRows, rgr.NumRows())
+ require.EqualValues(t, nCols, rgr.NumColumns())
+
+ rdr, err := rgr.Column(0)
+ require.NoError(t, err)
+
+ rows, ok := rdr.(*file.ByteArrayColumnChunkReader)
+ require.True(t, ok)
+
+ vals := make([]parquet.ByteArray, nRows)
+ total, read, err := rows.ReadBatch(int64(nRows), vals, nil, nil)
+ require.NoError(t, err)
+ require.Equal(t, int64(nRows), total)
+ require.Equal(t, nRows, read)
+
+ expectedValsHead := []parquet.ByteArray{
+ []byte("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"),
+ []byte("e8fb9197-cb9f-4118-b67f-fbfa65f61843"),
+ []byte("885136e1-0aa1-4fdb-8847-63d87b07c205"),
+ []byte("ce7b2019-8ebe-4906-a74d-0afa2409e5df"),
+ []byte("a9ee2527-821b-4b71-a926-03f73c3fc8b7"),
+ }
+ require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)])
+}
diff --git a/go/parquet/file/file_writer_test.go
b/go/parquet/file/file_writer_test.go
index 0faf3f7233..12ac93d1ef 100644
--- a/go/parquet/file/file_writer_test.go
+++ b/go/parquet/file/file_writer_test.go
@@ -260,7 +260,7 @@ func (t *SerializeTestSuite) TestSmallFile() {
compress.Codecs.Brotli,
compress.Codecs.Gzip,
compress.Codecs.Zstd,
- // compress.Codecs.Lz4,
+ compress.Codecs.Lz4Raw,
// compress.Codecs.Lzo,
}
for _, c := range codecs {
@@ -540,3 +540,59 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T)
{
require.NoError(t, rdr.Close())
}
+
+func TestLZ4RawFileRoundtrip(t *testing.T) {
+ input := []int64{
+ -1, 0, 1, 2, 3, 4, 5, 123456789, -123456789,
+ }
+
+ size := len(input)
+
+ field, err := schema.NewPrimitiveNodeLogical("int64",
parquet.Repetitions.Required, nil, parquet.Types.Int64, 0, 1)
+ require.NoError(t, err)
+
+ schema, err := schema.NewGroupNode("test",
parquet.Repetitions.Required, schema.FieldList{field}, 0)
+ require.NoError(t, err)
+
+ sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ writer := file.NewParquetWriter(sink, schema,
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Lz4Raw))))
+
+ rgw := writer.AppendRowGroup()
+ cw, err := rgw.NextColumn()
+ require.NoError(t, err)
+
+ i64ColumnWriter, ok := cw.(*file.Int64ColumnChunkWriter)
+ require.True(t, ok)
+
+ nVals, err := i64ColumnWriter.WriteBatch(input, nil, nil)
+ require.NoError(t, err)
+ require.EqualValues(t, size, nVals)
+
+ require.NoError(t, cw.Close())
+ require.NoError(t, rgw.Close())
+ require.NoError(t, writer.Close())
+
+ rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
+ require.NoError(t, err)
+
+ require.Equal(t, 1, rdr.NumRowGroups())
+ require.EqualValues(t, size, rdr.NumRows())
+
+ rgr := rdr.RowGroup(0)
+ cr, err := rgr.Column(0)
+ require.NoError(t, err)
+
+ i64ColumnReader, ok := cr.(*file.Int64ColumnChunkReader)
+ require.True(t, ok)
+
+ output := make([]int64, size)
+
+ total, valuesRead, err := i64ColumnReader.ReadBatch(int64(size),
output, nil, nil)
+ require.NoError(t, err)
+ require.EqualValues(t, size, total)
+ require.EqualValues(t, size, valuesRead)
+
+ require.Equal(t, input, output)
+
+ require.NoError(t, rdr.Close())
+}
diff --git a/go/parquet/pqarrow/reader_writer_test.go
b/go/parquet/pqarrow/reader_writer_test.go
index 31bd0eba84..e020c7d945 100644
--- a/go/parquet/pqarrow/reader_writer_test.go
+++ b/go/parquet/pqarrow/reader_writer_test.go
@@ -19,6 +19,8 @@ package pqarrow_test
import (
"bytes"
"context"
+ "fmt"
+ "math"
"testing"
"unsafe"
@@ -26,8 +28,10 @@ import (
"github.com/apache/arrow/go/v18/arrow/array"
"github.com/apache/arrow/go/v18/arrow/memory"
"github.com/apache/arrow/go/v18/parquet"
+ "github.com/apache/arrow/go/v18/parquet/compress"
"github.com/apache/arrow/go/v18/parquet/file"
"github.com/apache/arrow/go/v18/parquet/pqarrow"
+ "github.com/stretchr/testify/require"
"golang.org/x/exp/rand"
"gonum.org/v1/gonum/stat/distuv"
)
@@ -275,3 +279,110 @@ func BenchmarkReadColumnFloat64(b *testing.B) {
benchReadTable(b, tt.name, tbl,
int64(arrow.Int32Traits.BytesRequired(SIZELEN)))
}
}
+
+var compressTestCases = []struct {
+ c compress.Compression
+}{
+ {compress.Codecs.Uncompressed},
+ {compress.Codecs.Snappy},
+ {compress.Codecs.Gzip},
+ {compress.Codecs.Brotli},
+ {compress.Codecs.Zstd},
+ {compress.Codecs.Lz4Raw},
+ // {compress.Codecs.Lzo},
+}
+
+func buildTableForTest(mem memory.Allocator) arrow.Table {
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "int64s", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "strings", Type: arrow.BinaryTypes.String},
+ {Name: "bools", Type: arrow.FixedWidthTypes.Boolean},
+ {Name: "repeated_int64s", Type:
arrow.PrimitiveTypes.Int64},
+ {Name: "repeated_strings", Type:
arrow.BinaryTypes.String},
+ {Name: "repeated_bools", Type:
arrow.FixedWidthTypes.Boolean},
+ },
+ nil,
+ )
+ bldr := array.NewRecordBuilder(mem, schema)
+ defer bldr.Release()
+
+ for i := 0; i < SIZELEN; i++ {
+ bldr.Field(0).(*array.Int64Builder).Append(int64(i))
+ bldr.Field(1).(*array.StringBuilder).Append(fmt.Sprint(i))
+ bldr.Field(2).(*array.BooleanBuilder).Append(i%2 == 0)
+ bldr.Field(3).(*array.Int64Builder).Append(0)
+ bldr.Field(4).(*array.StringBuilder).Append("the string is the
same")
+ bldr.Field(5).(*array.BooleanBuilder).Append(true)
+ }
+
+ rec := bldr.NewRecord()
+ return array.NewTableFromRecords(schema, []arrow.Record{rec})
+}
+
+func BenchmarkWriteTableCompressed(b *testing.B) {
+ mem := memory.DefaultAllocator
+ table := buildTableForTest(mem)
+ defer table.Release()
+
+ var uncompressedSize uint64
+ for idxCol := 0; int64(idxCol) < table.NumCols(); idxCol++ {
+ column := table.Column(idxCol)
+ for _, chunk := range column.Data().Chunks() {
+ uncompressedSize += chunk.Data().SizeInBytes()
+ }
+ }
+
+ var buf bytes.Buffer
+ buf.Grow(int(uncompressedSize))
+ for _, tc := range compressTestCases {
+ b.Run(fmt.Sprintf("codec=%s", tc.c), func(b *testing.B) {
+ buf.Reset()
+ b.ResetTimer()
+ b.SetBytes(int64(uncompressedSize))
+ for n := 0; n < b.N; n++ {
+ require.NoError(b,
+ pqarrow.WriteTable(
+ table,
+ &buf,
+ math.MaxInt64,
+
parquet.NewWriterProperties(parquet.WithAllocator(mem),
parquet.WithCompression(tc.c)),
+ pqarrow.DefaultWriterProps(),
+ ),
+ )
+ }
+ })
+ }
+}
+
+func BenchmarkReadTableCompressed(b *testing.B) {
+ ctx := context.Background()
+ mem := memory.DefaultAllocator
+ table := buildTableForTest(mem)
+ defer table.Release()
+
+ for _, tc := range compressTestCases {
+ b.Run(fmt.Sprintf("codec=%s", tc.c), func(b *testing.B) {
+ var buf bytes.Buffer
+ err := pqarrow.WriteTable(
+ table,
+ &buf,
+ math.MaxInt64,
+
parquet.NewWriterProperties(parquet.WithAllocator(mem),
parquet.WithCompression(tc.c)),
+ pqarrow.DefaultWriterProps(),
+ )
+ require.NoError(b, err)
+
+ compressedBytes := buf.Len()
+ rdr := bytes.NewReader(buf.Bytes())
+
+ b.ResetTimer()
+ b.SetBytes(int64(compressedBytes))
+ for n := 0; n < b.N; n++ {
+ tab, err := pqarrow.ReadTable(ctx, rdr, nil,
pqarrow.ArrowReadProperties{}, mem)
+ require.NoError(b, err)
+ defer tab.Release()
+ }
+ })
+ }
+}