This is an automated email from the ASF dual-hosted git repository.

joellubi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6502f0e3ad GH-43790: [Go][Parquet] Add support for LZ4_RAW compression 
codec (#43835)
6502f0e3ad is described below

commit 6502f0e3ad046d361aba44385ab3379ed7af5b7f
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Tue Aug 27 13:17:39 2024 -0400

    GH-43790: [Go][Parquet] Add support for LZ4_RAW compression codec (#43835)
    
    
    
    ### Rationale for this change
    
    Fixes: #43790
    
    The LZ4 compression codec for Parquet is no longer ambiguous, as it has 
been superceded by the 
[LZ4_RAW](https://github.com/apache/parquet-format/blob/master/Compression.md#lz4_raw)
 spec.
    
    ### What changes are included in this PR?
    
    - Add `LZ4Raw` compression codec
    - Split out `StreamingCodec` methods from core `Codec` interface
    - Various conformance/roundtrip tests
    - Set of benchmarks for reading/writing an Arrow table to/from Parquet, 
using each compression codec
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    - New codec `LZ4Raw` is available
    - `Codec` interface no long provides the following methods, which are now 
part of `StreamingCodec`:
      - `NewReader`
      - `NewWriter`
      - `NewWriterLevel`
    
    * GitHub Issue: #43790
    
    Authored-by: Joel Lubinitsky <[email protected]>
    Signed-off-by: Joel Lubinitsky <[email protected]>
---
 go/parquet/compress/compress.go          |  22 ++++--
 go/parquet/compress/compress_test.go     |   8 +-
 go/parquet/compress/lz4_raw.go           |  66 ++++++++++++++++
 go/parquet/file/file_reader_test.go      | 127 +++++++++++++++++++++++++++++++
 go/parquet/file/file_writer_test.go      |  58 +++++++++++++-
 go/parquet/pqarrow/reader_writer_test.go | 111 +++++++++++++++++++++++++++
 6 files changed, 380 insertions(+), 12 deletions(-)

diff --git a/go/parquet/compress/compress.go b/go/parquet/compress/compress.go
index b6a1349133..92f2ae99bb 100644
--- a/go/parquet/compress/compress.go
+++ b/go/parquet/compress/compress.go
@@ -49,8 +49,9 @@ var Codecs = struct {
        Brotli Compression
        // LZ4 unsupported in this library due to problematic issues between 
the Hadoop LZ4 spec vs regular lz4
        // see: 
http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E
-       Lz4  Compression
-       Zstd Compression
+       Lz4    Compression
+       Zstd   Compression
+       Lz4Raw Compression
 }{
        Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
        Snappy:       Compression(parquet.CompressionCodec_SNAPPY),
@@ -59,17 +60,12 @@ var Codecs = struct {
        Brotli:       Compression(parquet.CompressionCodec_BROTLI),
        Lz4:          Compression(parquet.CompressionCodec_LZ4),
        Zstd:         Compression(parquet.CompressionCodec_ZSTD),
+       Lz4Raw:       Compression(parquet.CompressionCodec_LZ4_RAW),
 }
 
 // Codec is an interface which is implemented for each compression type in 
order to make the interactions easy to
 // implement. Most consumers won't be calling GetCodec directly.
 type Codec interface {
-       // NewReader provides a reader that wraps a stream with compressed data 
to stream the uncompressed data
-       NewReader(io.Reader) io.ReadCloser
-       // NewWriter provides a wrapper around a write stream to compress data 
before writing it.
-       NewWriter(io.Writer) io.WriteCloser
-       // NewWriterLevel is like NewWriter but allows specifying the 
compression level
-       NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
        // Encode encodes a block of data given by src and returns the 
compressed block. dst should be either nil
        // or sized large enough to fit the compressed block (use CompressBound 
to allocate). dst and src should not
        // overlap since some of the compression types don't allow it.
@@ -90,6 +86,16 @@ type Codec interface {
        Decode(dst, src []byte) []byte
 }
 
+// StreamingCodec is an interface that may be implemented for compression 
codecs that expose a streaming API.
+type StreamingCodec interface {
+       // NewReader provides a reader that wraps a stream with compressed data 
to stream the uncompressed data
+       NewReader(io.Reader) io.ReadCloser
+       // NewWriter provides a wrapper around a write stream to compress data 
before writing it.
+       NewWriter(io.Writer) io.WriteCloser
+       // NewWriterLevel is like NewWriter but allows specifying the 
compression level
+       NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
+}
+
 var codecs = map[Compression]Codec{}
 
 // RegisterCodec adds or overrides a codec implementation for a given 
compression algorithm.
diff --git a/go/parquet/compress/compress_test.go 
b/go/parquet/compress/compress_test.go
index 843062c0d0..5aac74759e 100644
--- a/go/parquet/compress/compress_test.go
+++ b/go/parquet/compress/compress_test.go
@@ -66,8 +66,8 @@ func TestCompressDataOneShot(t *testing.T) {
                {compress.Codecs.Gzip},
                {compress.Codecs.Brotli},
                {compress.Codecs.Zstd},
+               {compress.Codecs.Lz4Raw},
                // {compress.Codecs.Lzo},
-               // {compress.Codecs.Lz4},
        }
 
        for _, tt := range tests {
@@ -107,9 +107,11 @@ func TestCompressReaderWriter(t *testing.T) {
                        var buf bytes.Buffer
                        codec, err := compress.GetCodec(tt.c)
                        assert.NoError(t, err)
+                       streamingCodec, ok := codec.(compress.StreamingCodec)
+                       assert.True(t, ok)
                        data := makeRandomData(RandomDataSize)
 
-                       wr := codec.NewWriter(&buf)
+                       wr := streamingCodec.NewWriter(&buf)
 
                        const chunkSize = 1111
                        input := data
@@ -129,7 +131,7 @@ func TestCompressReaderWriter(t *testing.T) {
                        }
                        wr.Close()
 
-                       rdr := codec.NewReader(&buf)
+                       rdr := streamingCodec.NewReader(&buf)
                        out, err := io.ReadAll(rdr)
                        assert.NoError(t, err)
                        assert.Exactly(t, data, out)
diff --git a/go/parquet/compress/lz4_raw.go b/go/parquet/compress/lz4_raw.go
new file mode 100644
index 0000000000..788d9520a6
--- /dev/null
+++ b/go/parquet/compress/lz4_raw.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+       "sync"
+
+       "github.com/pierrec/lz4/v4"
+)
+
+// lz4.Compressor is not goroutine-safe, so we use a pool to amortize the cost
+// of allocating a new one for each call to Encode().
+var compressorPool = sync.Pool{New: func() interface{} { return 
new(lz4.Compressor) }}
+
+func compressBlock(src, dst []byte) (int, error) {
+       c := compressorPool.Get().(*lz4.Compressor)
+       defer compressorPool.Put(c)
+       return c.CompressBlock(src, dst)
+}
+
+type lz4RawCodec struct{}
+
+func (c lz4RawCodec) Encode(dst, src []byte) []byte {
+       n, err := compressBlock(src, dst[:cap(dst)])
+       if err != nil {
+               panic(err)
+       }
+
+       return dst[:n]
+}
+
+func (c lz4RawCodec) EncodeLevel(dst, src []byte, _ int) []byte {
+       // the lz4 block implementation does not allow level to be set
+       return c.Encode(dst, src)
+}
+
+func (lz4RawCodec) Decode(dst, src []byte) []byte {
+       n, err := lz4.UncompressBlock(src, dst)
+       if err != nil {
+               panic(err)
+       }
+
+       return dst[:n]
+}
+
+func (c lz4RawCodec) CompressBound(len int64) int64 {
+       return int64(lz4.CompressBlockBound(int(len)))
+}
+
+func init() {
+       RegisterCodec(Codecs.Lz4Raw, lz4RawCodec{})
+}
diff --git a/go/parquet/file/file_reader_test.go 
b/go/parquet/file/file_reader_test.go
index 547ec475c2..35f4da4e86 100644
--- a/go/parquet/file/file_reader_test.go
+++ b/go/parquet/file/file_reader_test.go
@@ -644,3 +644,130 @@ func TestDeltaBinaryPackedMultipleBatches(t *testing.T) {
 
        require.Equalf(t, size, totalRows, "Expected %d rows, but got %d rows", 
size, totalRows)
 }
+
+// Test read file lz4_raw_compressed.parquet
+// Contents documented at 
https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
+func TestLZ4RawFileRead(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_DATA")
+       if dir == "" {
+               t.Skip("no path supplied with PARQUET_TEST_DATA")
+       }
+       require.DirExists(t, dir)
+
+       props := parquet.NewReaderProperties(memory.DefaultAllocator)
+       fileReader, err := file.OpenParquetFile(path.Join(dir, 
"lz4_raw_compressed.parquet"),
+               false, file.WithReadProps(props))
+       require.NoError(t, err)
+       defer fileReader.Close()
+
+       nRows := 4
+       nCols := 3
+       require.Equal(t, 1, fileReader.NumRowGroups())
+       rgr := fileReader.RowGroup(0)
+       require.EqualValues(t, nRows, rgr.NumRows())
+       require.EqualValues(t, nCols, rgr.NumColumns())
+
+       rdr, err := rgr.Column(0)
+       require.NoError(t, err)
+
+       rowsInt64, ok := rdr.(*file.Int64ColumnChunkReader)
+       require.True(t, ok)
+
+       valsInt64 := make([]int64, nRows)
+       total, read, err := rowsInt64.ReadBatch(int64(nRows), valsInt64, nil, 
nil)
+       require.NoError(t, err)
+       require.Equal(t, int64(nRows), total)
+       require.Equal(t, nRows, read)
+
+       expectedValsInt64 := []int64{
+               1593604800,
+               1593604800,
+               1593604801,
+               1593604801,
+       }
+       require.Equal(t, expectedValsInt64, valsInt64)
+
+       rdr, err = rgr.Column(1)
+       require.NoError(t, err)
+
+       rowsByteArray, ok := rdr.(*file.ByteArrayColumnChunkReader)
+       require.True(t, ok)
+
+       valsByteArray := make([]parquet.ByteArray, nRows)
+       total, read, err = rowsByteArray.ReadBatch(int64(nRows), valsByteArray, 
nil, nil)
+       require.NoError(t, err)
+       require.Equal(t, int64(nRows), total)
+       require.Equal(t, nRows, read)
+
+       expectedValsByteArray := []parquet.ByteArray{
+               []byte("abc"),
+               []byte("def"),
+               []byte("abc"),
+               []byte("def"),
+       }
+       require.Equal(t, expectedValsByteArray, valsByteArray)
+
+       rdr, err = rgr.Column(2)
+       require.NoError(t, err)
+
+       rowsFloat64, ok := rdr.(*file.Float64ColumnChunkReader)
+       require.True(t, ok)
+
+       valsFloat64 := make([]float64, nRows)
+       total, read, err = rowsFloat64.ReadBatch(int64(nRows), valsFloat64, 
nil, nil)
+       require.NoError(t, err)
+       require.Equal(t, int64(nRows), total)
+       require.Equal(t, nRows, read)
+
+       expectedValsFloat64 := []float64{
+               42.0,
+               7.7,
+               42.125,
+               7.7,
+       }
+       require.Equal(t, expectedValsFloat64, valsFloat64)
+}
+
+// Test read file lz4_raw_compressed_larger.parquet
+// Contents documented at 
https://github.com/apache/parquet-testing/commit/ddd898958803cb89b7156c6350584d1cda0fe8de
+func TestLZ4RawLargerFileRead(t *testing.T) {
+       dir := os.Getenv("PARQUET_TEST_DATA")
+       if dir == "" {
+               t.Skip("no path supplied with PARQUET_TEST_DATA")
+       }
+       require.DirExists(t, dir)
+
+       props := parquet.NewReaderProperties(memory.DefaultAllocator)
+       fileReader, err := file.OpenParquetFile(path.Join(dir, 
"lz4_raw_compressed_larger.parquet"),
+               false, file.WithReadProps(props))
+       require.NoError(t, err)
+       defer fileReader.Close()
+
+       nRows := 10000
+       nCols := 1
+       require.Equal(t, 1, fileReader.NumRowGroups())
+       rgr := fileReader.RowGroup(0)
+       require.EqualValues(t, nRows, rgr.NumRows())
+       require.EqualValues(t, nCols, rgr.NumColumns())
+
+       rdr, err := rgr.Column(0)
+       require.NoError(t, err)
+
+       rows, ok := rdr.(*file.ByteArrayColumnChunkReader)
+       require.True(t, ok)
+
+       vals := make([]parquet.ByteArray, nRows)
+       total, read, err := rows.ReadBatch(int64(nRows), vals, nil, nil)
+       require.NoError(t, err)
+       require.Equal(t, int64(nRows), total)
+       require.Equal(t, nRows, read)
+
+       expectedValsHead := []parquet.ByteArray{
+               []byte("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"),
+               []byte("e8fb9197-cb9f-4118-b67f-fbfa65f61843"),
+               []byte("885136e1-0aa1-4fdb-8847-63d87b07c205"),
+               []byte("ce7b2019-8ebe-4906-a74d-0afa2409e5df"),
+               []byte("a9ee2527-821b-4b71-a926-03f73c3fc8b7"),
+       }
+       require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)])
+}
diff --git a/go/parquet/file/file_writer_test.go 
b/go/parquet/file/file_writer_test.go
index 0faf3f7233..12ac93d1ef 100644
--- a/go/parquet/file/file_writer_test.go
+++ b/go/parquet/file/file_writer_test.go
@@ -260,7 +260,7 @@ func (t *SerializeTestSuite) TestSmallFile() {
                compress.Codecs.Brotli,
                compress.Codecs.Gzip,
                compress.Codecs.Zstd,
-               // compress.Codecs.Lz4,
+               compress.Codecs.Lz4Raw,
                // compress.Codecs.Lzo,
        }
        for _, c := range codecs {
@@ -540,3 +540,59 @@ func TestBatchedByteStreamSplitFileRoundtrip(t *testing.T) 
{
 
        require.NoError(t, rdr.Close())
 }
+
+func TestLZ4RawFileRoundtrip(t *testing.T) {
+       input := []int64{
+               -1, 0, 1, 2, 3, 4, 5, 123456789, -123456789,
+       }
+
+       size := len(input)
+
+       field, err := schema.NewPrimitiveNodeLogical("int64", 
parquet.Repetitions.Required, nil, parquet.Types.Int64, 0, 1)
+       require.NoError(t, err)
+
+       schema, err := schema.NewGroupNode("test", 
parquet.Repetitions.Required, schema.FieldList{field}, 0)
+       require.NoError(t, err)
+
+       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+       writer := file.NewParquetWriter(sink, schema, 
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Lz4Raw))))
+
+       rgw := writer.AppendRowGroup()
+       cw, err := rgw.NextColumn()
+       require.NoError(t, err)
+
+       i64ColumnWriter, ok := cw.(*file.Int64ColumnChunkWriter)
+       require.True(t, ok)
+
+       nVals, err := i64ColumnWriter.WriteBatch(input, nil, nil)
+       require.NoError(t, err)
+       require.EqualValues(t, size, nVals)
+
+       require.NoError(t, cw.Close())
+       require.NoError(t, rgw.Close())
+       require.NoError(t, writer.Close())
+
+       rdr, err := file.NewParquetReader(bytes.NewReader(sink.Bytes()))
+       require.NoError(t, err)
+
+       require.Equal(t, 1, rdr.NumRowGroups())
+       require.EqualValues(t, size, rdr.NumRows())
+
+       rgr := rdr.RowGroup(0)
+       cr, err := rgr.Column(0)
+       require.NoError(t, err)
+
+       i64ColumnReader, ok := cr.(*file.Int64ColumnChunkReader)
+       require.True(t, ok)
+
+       output := make([]int64, size)
+
+       total, valuesRead, err := i64ColumnReader.ReadBatch(int64(size), 
output, nil, nil)
+       require.NoError(t, err)
+       require.EqualValues(t, size, total)
+       require.EqualValues(t, size, valuesRead)
+
+       require.Equal(t, input, output)
+
+       require.NoError(t, rdr.Close())
+}
diff --git a/go/parquet/pqarrow/reader_writer_test.go 
b/go/parquet/pqarrow/reader_writer_test.go
index 31bd0eba84..e020c7d945 100644
--- a/go/parquet/pqarrow/reader_writer_test.go
+++ b/go/parquet/pqarrow/reader_writer_test.go
@@ -19,6 +19,8 @@ package pqarrow_test
 import (
        "bytes"
        "context"
+       "fmt"
+       "math"
        "testing"
        "unsafe"
 
@@ -26,8 +28,10 @@ import (
        "github.com/apache/arrow/go/v18/arrow/array"
        "github.com/apache/arrow/go/v18/arrow/memory"
        "github.com/apache/arrow/go/v18/parquet"
+       "github.com/apache/arrow/go/v18/parquet/compress"
        "github.com/apache/arrow/go/v18/parquet/file"
        "github.com/apache/arrow/go/v18/parquet/pqarrow"
+       "github.com/stretchr/testify/require"
        "golang.org/x/exp/rand"
        "gonum.org/v1/gonum/stat/distuv"
 )
@@ -275,3 +279,110 @@ func BenchmarkReadColumnFloat64(b *testing.B) {
                benchReadTable(b, tt.name, tbl, 
int64(arrow.Int32Traits.BytesRequired(SIZELEN)))
        }
 }
+
+var compressTestCases = []struct {
+       c compress.Compression
+}{
+       {compress.Codecs.Uncompressed},
+       {compress.Codecs.Snappy},
+       {compress.Codecs.Gzip},
+       {compress.Codecs.Brotli},
+       {compress.Codecs.Zstd},
+       {compress.Codecs.Lz4Raw},
+       // {compress.Codecs.Lzo},
+}
+
+func buildTableForTest(mem memory.Allocator) arrow.Table {
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "int64s", Type: arrow.PrimitiveTypes.Int64},
+                       {Name: "strings", Type: arrow.BinaryTypes.String},
+                       {Name: "bools", Type: arrow.FixedWidthTypes.Boolean},
+                       {Name: "repeated_int64s", Type: 
arrow.PrimitiveTypes.Int64},
+                       {Name: "repeated_strings", Type: 
arrow.BinaryTypes.String},
+                       {Name: "repeated_bools", Type: 
arrow.FixedWidthTypes.Boolean},
+               },
+               nil,
+       )
+       bldr := array.NewRecordBuilder(mem, schema)
+       defer bldr.Release()
+
+       for i := 0; i < SIZELEN; i++ {
+               bldr.Field(0).(*array.Int64Builder).Append(int64(i))
+               bldr.Field(1).(*array.StringBuilder).Append(fmt.Sprint(i))
+               bldr.Field(2).(*array.BooleanBuilder).Append(i%2 == 0)
+               bldr.Field(3).(*array.Int64Builder).Append(0)
+               bldr.Field(4).(*array.StringBuilder).Append("the string is the 
same")
+               bldr.Field(5).(*array.BooleanBuilder).Append(true)
+       }
+
+       rec := bldr.NewRecord()
+       return array.NewTableFromRecords(schema, []arrow.Record{rec})
+}
+
+func BenchmarkWriteTableCompressed(b *testing.B) {
+       mem := memory.DefaultAllocator
+       table := buildTableForTest(mem)
+       defer table.Release()
+
+       var uncompressedSize uint64
+       for idxCol := 0; int64(idxCol) < table.NumCols(); idxCol++ {
+               column := table.Column(idxCol)
+               for _, chunk := range column.Data().Chunks() {
+                       uncompressedSize += chunk.Data().SizeInBytes()
+               }
+       }
+
+       var buf bytes.Buffer
+       buf.Grow(int(uncompressedSize))
+       for _, tc := range compressTestCases {
+               b.Run(fmt.Sprintf("codec=%s", tc.c), func(b *testing.B) {
+                       buf.Reset()
+                       b.ResetTimer()
+                       b.SetBytes(int64(uncompressedSize))
+                       for n := 0; n < b.N; n++ {
+                               require.NoError(b,
+                                       pqarrow.WriteTable(
+                                               table,
+                                               &buf,
+                                               math.MaxInt64,
+                                               
parquet.NewWriterProperties(parquet.WithAllocator(mem), 
parquet.WithCompression(tc.c)),
+                                               pqarrow.DefaultWriterProps(),
+                                       ),
+                               )
+                       }
+               })
+       }
+}
+
+func BenchmarkReadTableCompressed(b *testing.B) {
+       ctx := context.Background()
+       mem := memory.DefaultAllocator
+       table := buildTableForTest(mem)
+       defer table.Release()
+
+       for _, tc := range compressTestCases {
+               b.Run(fmt.Sprintf("codec=%s", tc.c), func(b *testing.B) {
+                       var buf bytes.Buffer
+                       err := pqarrow.WriteTable(
+                               table,
+                               &buf,
+                               math.MaxInt64,
+                               
parquet.NewWriterProperties(parquet.WithAllocator(mem), 
parquet.WithCompression(tc.c)),
+                               pqarrow.DefaultWriterProps(),
+                       )
+                       require.NoError(b, err)
+
+                       compressedBytes := buf.Len()
+                       rdr := bytes.NewReader(buf.Bytes())
+
+                       b.ResetTimer()
+                       b.SetBytes(int64(compressedBytes))
+                       for n := 0; n < b.N; n++ {
+                               tab, err := pqarrow.ReadTable(ctx, rdr, nil, 
pqarrow.ArrowReadProperties{}, mem)
+                               require.NoError(b, err)
+                               defer tab.Release()
+                       }
+               })
+       }
+}

Reply via email to