emkornfield commented on a change in pull request #9817:
URL: https://github.com/apache/arrow/pull/9817#discussion_r609822671



##########
File path: go/parquet/compress/brotli.go
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+
+       "github.com/andybalholm/brotli"
+)
+
+type brotliCodec struct{}
+
+func (brotliCodec) NewReader(r io.Reader) io.ReadCloser {
+       return ioutil.NopCloser(brotli.NewReader(r))
+}
+
+func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte {
+       if level == DefaultCompressionLevel {
+               level = brotli.DefaultCompression
+       }
+
+       maxlen := int(b.CompressBound(int64(len(src))))
+       if dst == nil || cap(dst) < maxlen {
+               dst = make([]byte, 0, maxlen)
+       }
+       buf := bytes.NewBuffer(dst[:0])
+       w := brotli.NewWriterLevel(buf, level)
+       _, err := w.Write(src)
+       if err != nil {
+               panic(err)
+       }
+       if err := w.Close(); err != nil {
+               panic(err)
+       }
+       return buf.Bytes()
+}
+
+func (b brotliCodec) Encode(dst, src []byte) []byte {
+       return b.EncodeLevel(dst, src, brotli.DefaultCompression)
+}
+
+func (brotliCodec) Decode(dst, src []byte) []byte {
+       rdr := brotli.NewReader(bytes.NewReader(src))
+       if dst != nil {
+               var (
+                       sofar       = 0
+                       n           = -1
+                       err   error = nil
+               )
+               for n != 0 && err == nil {
+                       n, err = rdr.Read(dst[sofar:])
+                       sofar += n
+               }
+               if err != nil && err != io.EOF {
+                       panic(err)
+               }
+               return dst[:sofar]
+       }
+
+       dst, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               panic(err)
+       }
+
+       return dst
+}
+
+// taken from brotli/enc/encode.c

Review comment:
       please provide function/macro name if possible.

##########
File path: go/parquet/compress/compress.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package compress contains the interfaces and implementations for handling 
compression/decompression
+// of parquet data at the column levels.
+package compress
+
+import (
+       "compress/flate"
+       "io"
+       "io/ioutil"
+
+       "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+)
+
+// Compression is an alias to the thrift compression codec enum type for easy 
use
+type Compression parquet.CompressionCodec
+
+func (c Compression) String() string {
+       return parquet.CompressionCodec(c).String()
+}
+
+// DefaultCompressionLevel will use flate.DefaultCompression since many of the 
compression libraries
+// use that to denote "use the default".
+const DefaultCompressionLevel = flate.DefaultCompression
+
+// Codecs is a useful struct to provide namespaced enum values to use for 
specifying the compression type to use
+// which make for easy internal swapping between them and the thrift enum 
since they are initialized to the same
+// constant values.
+var Codecs = struct {
+       Uncompressed Compression
+       Snappy       Compression
+       Gzip         Compression
+       // LZO is unsupported in this library as I haven't yet found a good 
implementation of it for golang.
+       Lzo    Compression
+       Brotli Compression
+       // LZ4 unsupported in this library due to problematic issues between 
the Hadoop LZ4 spec vs regular lz4
+       // see: 
http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E
+       Lz4  Compression
+       Zstd Compression
+}{
+       Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
+       Snappy:       Compression(parquet.CompressionCodec_SNAPPY),
+       Gzip:         Compression(parquet.CompressionCodec_GZIP),
+       Lzo:          Compression(parquet.CompressionCodec_LZO),
+       Brotli:       Compression(parquet.CompressionCodec_BROTLI),
+       Lz4:          Compression(parquet.CompressionCodec_LZ4),
+       Zstd:         Compression(parquet.CompressionCodec_ZSTD),
+}
+
+// Codec is an interface which is implemented for each compression type in 
order to make the interactions easy to
+// implement. Most consumers won't be calling GetCodec directly.
+type Codec interface {
+       // NewReader provides a reader that wraps a stream with compressed data 
to stream the uncompressed data
+       NewReader(io.Reader) io.ReadCloser
+       // NewWriter provides a wrapper around a write stream to compress data 
before writing it.
+       NewWriter(io.Writer) io.WriteCloser
+       // NewWriterLevel is like NewWrapper but allows specifying the 
compression level
+       NewWriterLevel(io.Writer, int) (io.WriteCloser, error)

Review comment:
       API comment.  It is worth exposing compression level in the API, an 
laternative would have be a member used at construction time?

##########
File path: go/parquet/compress/brotli.go
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+
+       "github.com/andybalholm/brotli"
+)
+
+type brotliCodec struct{}
+
+func (brotliCodec) NewReader(r io.Reader) io.ReadCloser {
+       return ioutil.NopCloser(brotli.NewReader(r))
+}
+
+func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte {
+       if level == DefaultCompressionLevel {
+               level = brotli.DefaultCompression
+       }
+
+       maxlen := int(b.CompressBound(int64(len(src))))
+       if dst == nil || cap(dst) < maxlen {
+               dst = make([]byte, 0, maxlen)
+       }
+       buf := bytes.NewBuffer(dst[:0])
+       w := brotli.NewWriterLevel(buf, level)
+       _, err := w.Write(src)
+       if err != nil {
+               panic(err)
+       }
+       if err := w.Close(); err != nil {
+               panic(err)
+       }
+       return buf.Bytes()
+}
+
+func (b brotliCodec) Encode(dst, src []byte) []byte {
+       return b.EncodeLevel(dst, src, brotli.DefaultCompression)
+}
+
+func (brotliCodec) Decode(dst, src []byte) []byte {
+       rdr := brotli.NewReader(bytes.NewReader(src))
+       if dst != nil {
+               var (
+                       sofar       = 0
+                       n           = -1
+                       err   error = nil
+               )
+               for n != 0 && err == nil {
+                       n, err = rdr.Read(dst[sofar:])
+                       sofar += n
+               }
+               if err != nil && err != io.EOF {
+                       panic(err)
+               }
+               return dst[:sofar]
+       }
+
+       dst, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               panic(err)
+       }
+
+       return dst
+}
+
+// taken from brotli/enc/encode.c
+func (brotliCodec) CompressBound(len int64) int64 {
+       // [window bits / empty metadata] + N * [uncompressed] + [last empty]

Review comment:
       maybe assert len > 0

##########
File path: go/parquet/types.go
##########
@@ -165,3 +169,186 @@ func (fixedLenByteArrayTraits) CastFromBytes(b []byte) 
[]FixedLenByteArray {
 
        return res
 }
+
+// Creating our own enums allows avoiding the transitive dependency on the
+// compiled thrift definitions in the public API, allowing us to not export
+// the entire Thrift definitions, while making everything a simple cast 
between.
+//
+// It also let's us add special values like NONE to distinguish between values
+// that are set or not set
+type (
+       // Type is the physical type as in parquet.thrift
+       Type format.Type
+       // Cipher is the parquet Cipher Algorithms
+       Cipher int
+       // ColumnOrder is the Column Order from the parquet.thrift
+       ColumnOrder *format.ColumnOrder
+       // Version is the parquet version type
+       Version int8
+       // DataPageVersion is the version of the Parquet Data Pages
+       DataPageVersion int8
+       // Encoding is the parquet Encoding type
+       Encoding format.Encoding
+       // Repetition is the underlying parquet field repetition type as in 
parquet.thrift
+       Repetition format.FieldRepetitionType
+       // ColumnPath is the path from the root of the schema to a given column
+       ColumnPath []string
+)
+
+func (c ColumnPath) String() string {
+       if c == nil {
+               return ""
+       }
+       return strings.Join(c, ".")
+}
+
+// Extend creates a new ColumnPath from an existing one, with the new 
ColumnPath having s appended to the end.
+func (c ColumnPath) Extend(s string) ColumnPath {
+       p := make([]string, len(c), len(c)+1)
+       copy(p, c)
+       return append(p, s)
+}
+
+// ColumnPathFromString constructs a ColumnPath from a dot separated string
+func ColumnPathFromString(s string) ColumnPath {
+       return strings.Split(s, ".")
+}
+
+// constants for choosing the Aes Algorithm to use for encryption/decryption
+const (
+       AesGcm Cipher = iota
+       AesCtr
+)
+
+// Constants for the parquet Version
+const (
+       V1 Version = 1
+       V2 Version = 2
+)
+
+// constants for the parquet DataPage Version to use
+const (
+       DataPageV1 DataPageVersion = iota
+       DataPageV2
+)
+
+func (e Encoding) String() string {
+       return format.Encoding(e).String()
+}
+
+var (
+       // Types contains constants for the Physical Types that are used in the 
Parquet Spec
+       //
+       // They can be specified when needed as such: `parquet.Types.Int32` 
etc. The values
+       // all correspond to the values in parquet.thrift
+       Types = struct {
+               Boolean           Type
+               Int32             Type
+               Int64             Type
+               Int96             Type
+               Float             Type
+               Double            Type
+               ByteArray         Type
+               FixedLenByteArray Type
+               // this only exists as a convienence so we can denote it when 
necessary
+               // nearly all functions that take a parquet.Type will 
error/panic if given
+               // Undefined
+               Undefined Type
+       }{
+               Boolean:           Type(format.Type_BOOLEAN),
+               Int32:             Type(format.Type_INT32),
+               Int64:             Type(format.Type_INT64),
+               Int96:             Type(format.Type_INT96),
+               Float:             Type(format.Type_FLOAT),
+               Double:            Type(format.Type_DOUBLE),
+               ByteArray:         Type(format.Type_BYTE_ARRAY),
+               FixedLenByteArray: Type(format.Type_FIXED_LEN_BYTE_ARRAY),
+               Undefined:         Type(format.Type_FIXED_LEN_BYTE_ARRAY + 1),
+       }
+
+       // Encodings contains constants for the encoding types of the column 
data
+       //
+       // The values used all correspond to the values in parquet.thrift for 
the
+       // corresponding encoding type.
+       Encodings = struct {
+               Plain                Encoding
+               PlainDict            Encoding
+               RLE                  Encoding
+               RLEDict              Encoding
+               BitPacked            Encoding // deprecated, not implemented
+               DeltaByteArray       Encoding
+               DeltaBinaryPacked    Encoding
+               DeltaLengthByteArray Encoding
+       }{
+               Plain:                Encoding(format.Encoding_PLAIN),
+               PlainDict:            
Encoding(format.Encoding_PLAIN_DICTIONARY),
+               RLE:                  Encoding(format.Encoding_RLE),
+               RLEDict:              Encoding(format.Encoding_RLE_DICTIONARY),
+               BitPacked:            Encoding(format.Encoding_BIT_PACKED),
+               DeltaByteArray:       
Encoding(format.Encoding_DELTA_BYTE_ARRAY),
+               DeltaBinaryPacked:    
Encoding(format.Encoding_DELTA_BINARY_PACKED),
+               DeltaLengthByteArray: 
Encoding(format.Encoding_DELTA_LENGTH_BYTE_ARRAY),
+       }
+
+       // ColumnOrders contains constants for the Column Ordering fields
+       ColumnOrders = struct {
+               Undefined        ColumnOrder
+               TypeDefinedOrder ColumnOrder
+       }{
+               Undefined:        format.NewColumnOrder(),
+               TypeDefinedOrder: &format.ColumnOrder{TYPE_ORDER: 
format.NewTypeDefinedOrder()},
+       }
+
+       // DefaultColumnOrder is to use TypeDefinedOrder
+       DefaultColumnOrder = ColumnOrders.TypeDefinedOrder
+
+       // Repetitions contains the constants for Field Repetition Types
+       Repetitions = struct {
+               Required  Repetition
+               Optional  Repetition
+               Repeated  Repetition
+               Undefined Repetition // convenience value

Review comment:
       it might pay to distinguish beween undefined and not set.  An issue was 
raised recently that at least in C++ we write Repetition required for the root 
of the schema when according to the spec we probably shouldn't.

##########
File path: go/parquet/compress/compress.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package compress contains the interfaces and implementations for handling 
compression/decompression
+// of parquet data at the column levels.
+package compress
+
+import (
+       "compress/flate"
+       "io"
+       "io/ioutil"
+
+       "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+)
+
+// Compression is an alias to the thrift compression codec enum type for easy 
use
+type Compression parquet.CompressionCodec
+
+func (c Compression) String() string {
+       return parquet.CompressionCodec(c).String()
+}
+
+// DefaultCompressionLevel will use flate.DefaultCompression since many of the 
compression libraries
+// use that to denote "use the default".
+const DefaultCompressionLevel = flate.DefaultCompression
+
+// Codecs is a useful struct to provide namespaced enum values to use for 
specifying the compression type to use
+// which make for easy internal swapping between them and the thrift enum 
since they are initialized to the same
+// constant values.
+var Codecs = struct {
+       Uncompressed Compression
+       Snappy       Compression
+       Gzip         Compression
+       // LZO is unsupported in this library as I haven't yet found a good 
implementation of it for golang.
+       Lzo    Compression

Review comment:
       forget if I already commented but chances are you won't without 
potential licensing concerns (I think LZO is GPL which I believe is 
incompatible with ASL(

##########
File path: go/parquet/writer_properties.go
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package parquet
+
+import (
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet/compress"
+)
+
+// Constants for default property values used for the default reader, writer 
and column props.
+const (
+       // Default Buffer size used for the Reader
+       DefaultBufSize int64 = 4096 * 4
+       // Default data page size limit is 1K it's not guaranteed, but we will 
try to
+       // cut data pages off at this size where possible.
+       DefaultDataPageSize int64 = 1024 * 1024
+       // Default is for dictionary encoding to be turned on, use 
WithDictionaryDefault
+       // writer property to change that.
+       DefaultDictionaryEnabled = true
+       // If the dictionary reaches the size of this limitation, the writer 
will use
+       // the fallback encoding (usually plain) instead of continuing to build 
the
+       // dictionary index.
+       DefaultDictionaryPageSizeLimit = DefaultDataPageSize
+       // In order to attempt to facilitate data page size limits for writing,
+       // data is written in batches. Increasing the batch size may improve 
performance
+       // but the larger the batch size, the easier it is to overshoot the 
datapage limit.
+       DefaultWriteBatchSize int64 = 1024
+       // Default maximum number of rows for a single row group
+       DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024
+       // Default is to have stats enabled for all columns, use writer 
properties to
+       // change the default, or to enable/disable for specific columns.
+       DefaultStatsEnabled = true
+       // If the stats are larger than 4K the writer will skip writing them 
out anyways.
+       DefaultMaxStatsSize int64 = 4096
+       DefaultCreatedBy          = "parquet-go version 1.0.0"
+)
+
+// ColumnProperties defines the encoding, codec, and so on for a given column.
+type ColumnProperties struct {
+       Encoding          Encoding
+       Codec             compress.Compression
+       DictionaryEnabled bool
+       StatsEnabled      bool
+       MaxStatsSize      int64
+       CompressionLevel  int
+}
+
+// DefaultColumnProperties returns the default properties which get utilized 
for writing.
+//
+// The default column properties are the following constants:
+//     Encoding:                                               Encodings.Plain
+//     Codec:                                                  
compress.Codecs.Uncompressed
+//     DictionaryEnabled:      DefaultDictionaryEnabled
+//     StatsEnabled:                           DefaultStatsEnabled
+//     MaxStatsSize:                           DefaultMaxStatsSize
+//     CompressionLevel:               compress.DefaultCompressionLevel
+func DefaultColumnProperties() ColumnProperties {
+       return ColumnProperties{
+               Encoding:          Encodings.Plain,
+               Codec:             compress.Codecs.Uncompressed,
+               DictionaryEnabled: DefaultDictionaryEnabled,
+               StatsEnabled:      DefaultStatsEnabled,
+               MaxStatsSize:      DefaultMaxStatsSize,
+               CompressionLevel:  compress.DefaultCompressionLevel,
+       }
+}
+
+type writerPropConfig struct {
+       wr            *WriterProperties
+       encodings     map[string]Encoding
+       codecs        map[string]compress.Compression
+       compressLevel map[string]int
+       dictEnabled   map[string]bool
+       statsEnabled  map[string]bool
+}
+
+// WriterProperty is used as the options for building a writer properties 
instance
+type WriterProperty func(*writerPropConfig)
+
+// WithAllocator specifies the writer to use the given allocator
+func WithAllocator(mem memory.Allocator) WriterProperty {
+       return func(cfg *writerPropConfig) {
+               cfg.wr.mem = mem
+       }
+}
+
+// WithDictionaryDefault sets the default value for whether to enable 
dictionary encoding
+func WithDictionaryDefault(dict bool) WriterProperty {
+       return func(cfg *writerPropConfig) {
+               cfg.wr.defColumnProps.DictionaryEnabled = dict
+       }
+}
+
+// WithDictionaryFor allows enabling or disabling dictionary encoding for a 
given column path string
+func WithDictionaryFor(path string, dict bool) WriterProperty {
+       return func(cfg *writerPropConfig) {
+               cfg.dictEnabled[path] = true
+       }
+}
+
+// WithDictionaryPath is like WithDictionaryFor, but takes a ColumnPath type
+func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty {
+       return WithDictionaryFor(path.String(), dict)
+}
+
+// WithDictionaryPageSizeLimit is the limit of the dictionary at which the 
writer
+// will fallback to plain encoding instead
+func WithDictionaryPageSizeLimit(limit int64) WriterProperty {
+       return func(cfg *writerPropConfig) {
+               cfg.wr.dictPagesize = limit
+       }
+}
+
+// WithBatchSize specifies the number of rows to use for batch writes to 
columns
+func WithBatchSize(batch int64) WriterProperty {
+       return func(cfg *writerPropConfig) {
+               cfg.wr.batchSize = batch
+       }
+}
+
+// WithMaxRowGroupLength specifies the number of rows as the maximum number of 
rows for a given row group in the writer.
+func WithMaxRowGroupLength(nrows int64) WriterProperty {

Review comment:
       not sure if this was mirrored on the C++ implementation intentionally, 
just wanted to check that this pattern is idiomatic go?

##########
File path: go/parquet/compress/brotli.go
##########
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+
+       "github.com/andybalholm/brotli"
+)
+
+type brotliCodec struct{}
+
+func (brotliCodec) NewReader(r io.Reader) io.ReadCloser {
+       return ioutil.NopCloser(brotli.NewReader(r))
+}
+
+func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte {
+       if level == DefaultCompressionLevel {
+               level = brotli.DefaultCompression
+       }
+
+       maxlen := int(b.CompressBound(int64(len(src))))
+       if dst == nil || cap(dst) < maxlen {
+               dst = make([]byte, 0, maxlen)
+       }
+       buf := bytes.NewBuffer(dst[:0])
+       w := brotli.NewWriterLevel(buf, level)
+       _, err := w.Write(src)
+       if err != nil {
+               panic(err)
+       }
+       if err := w.Close(); err != nil {
+               panic(err)
+       }
+       return buf.Bytes()
+}
+
+func (b brotliCodec) Encode(dst, src []byte) []byte {
+       return b.EncodeLevel(dst, src, brotli.DefaultCompression)
+}
+
+func (brotliCodec) Decode(dst, src []byte) []byte {
+       rdr := brotli.NewReader(bytes.NewReader(src))
+       if dst != nil {
+               var (
+                       sofar       = 0
+                       n           = -1
+                       err   error = nil
+               )
+               for n != 0 && err == nil {
+                       n, err = rdr.Read(dst[sofar:])
+                       sofar += n
+               }
+               if err != nil && err != io.EOF {
+                       panic(err)
+               }
+               return dst[:sofar]
+       }
+
+       dst, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               panic(err)
+       }
+
+       return dst
+}
+
+// taken from brotli/enc/encode.c

Review comment:
       i find it curious that there are at least two Go bindings to compression 
libraries that don't give this out of the box.

##########
File path: go/parquet/reader_properties.go
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package parquet
+
+import (
+       "bytes"
+       "io"
+
+       "github.com/apache/arrow/go/arrow/ipc"
+       "github.com/apache/arrow/go/arrow/memory"
+       "golang.org/x/xerrors"
+)
+
+// ReaderProperties are used to define how the file reader will handle 
buffering and allocating buffers
+type ReaderProperties struct {
+       alloc memory.Allocator
+       // Default buffer size to utilize when reading chunks
+       BufferSize int64
+       // create with NewFileDecryptionProperties if dealing with an encrypted 
file
+       FileDecryptProps *FileDecryptionProperties
+       // If this is set to true, then the reader will use SectionReader to
+       // just use the read stream when reading data. Otherwise we will buffer
+       // the data we're going to read into memory first and then read that 
buffer.
+       BufferedStreamEnabled bool

Review comment:
       it might pay to add a little more commentary here on why one would 
choose one over the other.

##########
File path: go/parquet/compress/zstd.go
##########
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package compress
+
+import (
+       "io"
+       "sync"
+
+       "github.com/klauspost/compress/zstd"
+)
+
+type zstdCodec struct{}
+
+type zstdcloser struct {
+       *zstd.Decoder
+}
+
+var (
+       enc         *zstd.Encoder
+       dec         *zstd.Decoder
+       initEncoder sync.Once
+       initDecoder sync.Once
+)
+
+func getencoder() *zstd.Encoder {
+       initEncoder.Do(func() {
+               enc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true))
+       })
+       return enc
+}
+
+func getdecoder() *zstd.Decoder {
+       initDecoder.Do(func() {
+               dec, _ = zstd.NewReader(nil)
+       })
+       return dec
+}
+
+func (zstdCodec) Decode(dst, src []byte) []byte {
+       dst, err := getdecoder().DecodeAll(src, dst[:0])
+       if err != nil {
+               panic(err)
+       }
+       return dst
+}
+
+func (z *zstdcloser) Close() error {
+       z.Decoder.Close()
+       return nil
+}
+
+func (zstdCodec) NewReader(r io.Reader) io.ReadCloser {
+       ret, _ := zstd.NewReader(r)
+       return &zstdcloser{ret}
+}
+
+func (zstdCodec) NewWriter(w io.Writer) io.WriteCloser {
+       ret, _ := zstd.NewWriter(w)
+       return ret
+}
+
+func (zstdCodec) NewWriterLevel(w io.Writer, level int) (io.WriteCloser, 
error) {
+       var compressLevel zstd.EncoderLevel
+       if level == DefaultCompressionLevel {
+               compressLevel = zstd.SpeedDefault
+       } else {
+               compressLevel = zstd.EncoderLevelFromZstd(level)
+       }
+       return zstd.NewWriter(w, zstd.WithEncoderLevel(compressLevel))
+}
+
+func (z zstdCodec) Encode(dst, src []byte) []byte {
+       return getencoder().EncodeAll(src, dst[:0])
+}
+
+func (z zstdCodec) EncodeLevel(dst, src []byte, level int) []byte {
+       compressLevel := zstd.EncoderLevelFromZstd(level)
+       if level == DefaultCompressionLevel {
+               compressLevel = zstd.SpeedDefault
+       }
+       enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), 
zstd.WithEncoderLevel(compressLevel))
+       return enc.EncodeAll(src, dst[:0])
+}
+
+// from zstd.h

Review comment:
       function name is useful, here too

##########
File path: go/parquet/compress/compress.go
##########
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package compress contains the interfaces and implementations for handling 
compression/decompression
+// of parquet data at the column levels.
+package compress
+
+import (
+       "compress/flate"
+       "io"
+       "io/ioutil"
+
+       "github.com/apache/arrow/go/parquet/internal/gen-go/parquet"
+)
+
+// Compression is an alias to the thrift compression codec enum type for easy 
use
+type Compression parquet.CompressionCodec
+
+func (c Compression) String() string {
+       return parquet.CompressionCodec(c).String()
+}
+
+// DefaultCompressionLevel will use flate.DefaultCompression since many of the 
compression libraries
+// use that to denote "use the default".
+const DefaultCompressionLevel = flate.DefaultCompression
+
+// Codecs is a useful struct to provide namespaced enum values to use for 
specifying the compression type to use
+// which make for easy internal swapping between them and the thrift enum 
since they are initialized to the same
+// constant values.
+var Codecs = struct {
+       Uncompressed Compression
+       Snappy       Compression
+       Gzip         Compression
+       // LZO is unsupported in this library as I haven't yet found a good 
implementation of it for golang.
+       Lzo    Compression
+       Brotli Compression
+       // LZ4 unsupported in this library due to problematic issues between 
the Hadoop LZ4 spec vs regular lz4
+       // see: 
http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E
+       Lz4  Compression
+       Zstd Compression
+}{
+       Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED),
+       Snappy:       Compression(parquet.CompressionCodec_SNAPPY),
+       Gzip:         Compression(parquet.CompressionCodec_GZIP),
+       Lzo:          Compression(parquet.CompressionCodec_LZO),
+       Brotli:       Compression(parquet.CompressionCodec_BROTLI),
+       Lz4:          Compression(parquet.CompressionCodec_LZ4),
+       Zstd:         Compression(parquet.CompressionCodec_ZSTD),
+}
+
+// Codec is an interface which is implemented for each compression type in 
order to make the interactions easy to
+// implement. Most consumers won't be calling GetCodec directly.
+type Codec interface {
+       // NewReader provides a reader that wraps a stream with compressed data 
to stream the uncompressed data
+       NewReader(io.Reader) io.ReadCloser
+       // NewWriter provides a wrapper around a write stream to compress data 
before writing it.
+       NewWriter(io.Writer) io.WriteCloser
+       // NewWriterLevel is like NewWrapper but allows specifying the 
compression level
+       NewWriterLevel(io.Writer, int) (io.WriteCloser, error)
+       // Encode encodes a block of data given by src and returns the 
compressed block. dst needs to be either nil
+       // or sized large enough to fit the compressed block (use CompressBound 
to allocate). dst and src should not
+       // overlap since some of the compression types don't allow it.
+       //
+       // The returned slice *might* be a slice of dst if it was able to fit 
the whole compressed data in it.

Review comment:
       nit: this seems to disagree with the comment above.  It would imply that 
a third option is DST will be used if it sized appropriately but otherwise new 
memory would be allocated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to