emkornfield commented on a change in pull request #9817: URL: https://github.com/apache/arrow/pull/9817#discussion_r609822671
########## File path: go/parquet/compress/brotli.go ########## @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compress + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/andybalholm/brotli" +) + +type brotliCodec struct{} + +func (brotliCodec) NewReader(r io.Reader) io.ReadCloser { + return ioutil.NopCloser(brotli.NewReader(r)) +} + +func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte { + if level == DefaultCompressionLevel { + level = brotli.DefaultCompression + } + + maxlen := int(b.CompressBound(int64(len(src)))) + if dst == nil || cap(dst) < maxlen { + dst = make([]byte, 0, maxlen) + } + buf := bytes.NewBuffer(dst[:0]) + w := brotli.NewWriterLevel(buf, level) + _, err := w.Write(src) + if err != nil { + panic(err) + } + if err := w.Close(); err != nil { + panic(err) + } + return buf.Bytes() +} + +func (b brotliCodec) Encode(dst, src []byte) []byte { + return b.EncodeLevel(dst, src, brotli.DefaultCompression) +} + +func (brotliCodec) Decode(dst, src []byte) []byte { + rdr := brotli.NewReader(bytes.NewReader(src)) + if dst != nil { + var ( + sofar = 0 + n = -1 + err error = nil + ) + for n != 0 && err == nil { + n, err = rdr.Read(dst[sofar:]) + sofar += n + } + if err != nil && err != io.EOF { + panic(err) + } + return dst[:sofar] + } + + dst, err := ioutil.ReadAll(rdr) + if err != nil { + panic(err) + } + + return dst +} + +// taken from brotli/enc/encode.c Review comment: please provide function/macro name if possible. ########## File path: go/parquet/compress/compress.go ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package compress contains the interfaces and implementations for handling compression/decompression +// of parquet data at the column levels. +package compress + +import ( + "compress/flate" + "io" + "io/ioutil" + + "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// Compression is an alias to the thrift compression codec enum type for easy use +type Compression parquet.CompressionCodec + +func (c Compression) String() string { + return parquet.CompressionCodec(c).String() +} + +// DefaultCompressionLevel will use flate.DefaultCompression since many of the compression libraries +// use that to denote "use the default". +const DefaultCompressionLevel = flate.DefaultCompression + +// Codecs is a useful struct to provide namespaced enum values to use for specifying the compression type to use +// which make for easy internal swapping between them and the thrift enum since they are initialized to the same +// constant values. +var Codecs = struct { + Uncompressed Compression + Snappy Compression + Gzip Compression + // LZO is unsupported in this library as I haven't yet found a good implementation of it for golang. + Lzo Compression + Brotli Compression + // LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4 + // see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E + Lz4 Compression + Zstd Compression +}{ + Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED), + Snappy: Compression(parquet.CompressionCodec_SNAPPY), + Gzip: Compression(parquet.CompressionCodec_GZIP), + Lzo: Compression(parquet.CompressionCodec_LZO), + Brotli: Compression(parquet.CompressionCodec_BROTLI), + Lz4: Compression(parquet.CompressionCodec_LZ4), + Zstd: Compression(parquet.CompressionCodec_ZSTD), +} + +// Codec is an interface which is implemented for each compression type in order to make the interactions easy to +// implement. Most consumers won't be calling GetCodec directly. +type Codec interface { + // NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data + NewReader(io.Reader) io.ReadCloser + // NewWriter provides a wrapper around a write stream to compress data before writing it. + NewWriter(io.Writer) io.WriteCloser + // NewWriterLevel is like NewWrapper but allows specifying the compression level + NewWriterLevel(io.Writer, int) (io.WriteCloser, error) Review comment: API comment. It is worth exposing compression level in the API, an laternative would have be a member used at construction time? ########## File path: go/parquet/compress/brotli.go ########## @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compress + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/andybalholm/brotli" +) + +type brotliCodec struct{} + +func (brotliCodec) NewReader(r io.Reader) io.ReadCloser { + return ioutil.NopCloser(brotli.NewReader(r)) +} + +func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte { + if level == DefaultCompressionLevel { + level = brotli.DefaultCompression + } + + maxlen := int(b.CompressBound(int64(len(src)))) + if dst == nil || cap(dst) < maxlen { + dst = make([]byte, 0, maxlen) + } + buf := bytes.NewBuffer(dst[:0]) + w := brotli.NewWriterLevel(buf, level) + _, err := w.Write(src) + if err != nil { + panic(err) + } + if err := w.Close(); err != nil { + panic(err) + } + return buf.Bytes() +} + +func (b brotliCodec) Encode(dst, src []byte) []byte { + return b.EncodeLevel(dst, src, brotli.DefaultCompression) +} + +func (brotliCodec) Decode(dst, src []byte) []byte { + rdr := brotli.NewReader(bytes.NewReader(src)) + if dst != nil { + var ( + sofar = 0 + n = -1 + err error = nil + ) + for n != 0 && err == nil { + n, err = rdr.Read(dst[sofar:]) + sofar += n + } + if err != nil && err != io.EOF { + panic(err) + } + return dst[:sofar] + } + + dst, err := ioutil.ReadAll(rdr) + if err != nil { + panic(err) + } + + return dst +} + +// taken from brotli/enc/encode.c +func (brotliCodec) CompressBound(len int64) int64 { + // [window bits / empty metadata] + N * [uncompressed] + [last empty] Review comment: maybe assert len > 0 ########## File path: go/parquet/types.go ########## @@ -165,3 +169,186 @@ func (fixedLenByteArrayTraits) CastFromBytes(b []byte) []FixedLenByteArray { return res } + +// Creating our own enums allows avoiding the transitive dependency on the +// compiled thrift definitions in the public API, allowing us to not export +// the entire Thrift definitions, while making everything a simple cast between. +// +// It also let's us add special values like NONE to distinguish between values +// that are set or not set +type ( + // Type is the physical type as in parquet.thrift + Type format.Type + // Cipher is the parquet Cipher Algorithms + Cipher int + // ColumnOrder is the Column Order from the parquet.thrift + ColumnOrder *format.ColumnOrder + // Version is the parquet version type + Version int8 + // DataPageVersion is the version of the Parquet Data Pages + DataPageVersion int8 + // Encoding is the parquet Encoding type + Encoding format.Encoding + // Repetition is the underlying parquet field repetition type as in parquet.thrift + Repetition format.FieldRepetitionType + // ColumnPath is the path from the root of the schema to a given column + ColumnPath []string +) + +func (c ColumnPath) String() string { + if c == nil { + return "" + } + return strings.Join(c, ".") +} + +// Extend creates a new ColumnPath from an existing one, with the new ColumnPath having s appended to the end. +func (c ColumnPath) Extend(s string) ColumnPath { + p := make([]string, len(c), len(c)+1) + copy(p, c) + return append(p, s) +} + +// ColumnPathFromString constructs a ColumnPath from a dot separated string +func ColumnPathFromString(s string) ColumnPath { + return strings.Split(s, ".") +} + +// constants for choosing the Aes Algorithm to use for encryption/decryption +const ( + AesGcm Cipher = iota + AesCtr +) + +// Constants for the parquet Version +const ( + V1 Version = 1 + V2 Version = 2 +) + +// constants for the parquet DataPage Version to use +const ( + DataPageV1 DataPageVersion = iota + DataPageV2 +) + +func (e Encoding) String() string { + return format.Encoding(e).String() +} + +var ( + // Types contains constants for the Physical Types that are used in the Parquet Spec + // + // They can be specified when needed as such: `parquet.Types.Int32` etc. The values + // all correspond to the values in parquet.thrift + Types = struct { + Boolean Type + Int32 Type + Int64 Type + Int96 Type + Float Type + Double Type + ByteArray Type + FixedLenByteArray Type + // this only exists as a convienence so we can denote it when necessary + // nearly all functions that take a parquet.Type will error/panic if given + // Undefined + Undefined Type + }{ + Boolean: Type(format.Type_BOOLEAN), + Int32: Type(format.Type_INT32), + Int64: Type(format.Type_INT64), + Int96: Type(format.Type_INT96), + Float: Type(format.Type_FLOAT), + Double: Type(format.Type_DOUBLE), + ByteArray: Type(format.Type_BYTE_ARRAY), + FixedLenByteArray: Type(format.Type_FIXED_LEN_BYTE_ARRAY), + Undefined: Type(format.Type_FIXED_LEN_BYTE_ARRAY + 1), + } + + // Encodings contains constants for the encoding types of the column data + // + // The values used all correspond to the values in parquet.thrift for the + // corresponding encoding type. + Encodings = struct { + Plain Encoding + PlainDict Encoding + RLE Encoding + RLEDict Encoding + BitPacked Encoding // deprecated, not implemented + DeltaByteArray Encoding + DeltaBinaryPacked Encoding + DeltaLengthByteArray Encoding + }{ + Plain: Encoding(format.Encoding_PLAIN), + PlainDict: Encoding(format.Encoding_PLAIN_DICTIONARY), + RLE: Encoding(format.Encoding_RLE), + RLEDict: Encoding(format.Encoding_RLE_DICTIONARY), + BitPacked: Encoding(format.Encoding_BIT_PACKED), + DeltaByteArray: Encoding(format.Encoding_DELTA_BYTE_ARRAY), + DeltaBinaryPacked: Encoding(format.Encoding_DELTA_BINARY_PACKED), + DeltaLengthByteArray: Encoding(format.Encoding_DELTA_LENGTH_BYTE_ARRAY), + } + + // ColumnOrders contains constants for the Column Ordering fields + ColumnOrders = struct { + Undefined ColumnOrder + TypeDefinedOrder ColumnOrder + }{ + Undefined: format.NewColumnOrder(), + TypeDefinedOrder: &format.ColumnOrder{TYPE_ORDER: format.NewTypeDefinedOrder()}, + } + + // DefaultColumnOrder is to use TypeDefinedOrder + DefaultColumnOrder = ColumnOrders.TypeDefinedOrder + + // Repetitions contains the constants for Field Repetition Types + Repetitions = struct { + Required Repetition + Optional Repetition + Repeated Repetition + Undefined Repetition // convenience value Review comment: it might pay to distinguish beween undefined and not set. An issue was raised recently that at least in C++ we write Repetition required for the root of the schema when according to the spec we probably shouldn't. ########## File path: go/parquet/compress/compress.go ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package compress contains the interfaces and implementations for handling compression/decompression +// of parquet data at the column levels. +package compress + +import ( + "compress/flate" + "io" + "io/ioutil" + + "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// Compression is an alias to the thrift compression codec enum type for easy use +type Compression parquet.CompressionCodec + +func (c Compression) String() string { + return parquet.CompressionCodec(c).String() +} + +// DefaultCompressionLevel will use flate.DefaultCompression since many of the compression libraries +// use that to denote "use the default". +const DefaultCompressionLevel = flate.DefaultCompression + +// Codecs is a useful struct to provide namespaced enum values to use for specifying the compression type to use +// which make for easy internal swapping between them and the thrift enum since they are initialized to the same +// constant values. +var Codecs = struct { + Uncompressed Compression + Snappy Compression + Gzip Compression + // LZO is unsupported in this library as I haven't yet found a good implementation of it for golang. + Lzo Compression Review comment: forget if I already commented but chances are you won't without potential licensing concerns (I think LZO is GPL which I believe is incompatible with ASL( ########## File path: go/parquet/writer_properties.go ########## @@ -0,0 +1,510 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parquet + +import ( + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet/compress" +) + +// Constants for default property values used for the default reader, writer and column props. +const ( + // Default Buffer size used for the Reader + DefaultBufSize int64 = 4096 * 4 + // Default data page size limit is 1K it's not guaranteed, but we will try to + // cut data pages off at this size where possible. + DefaultDataPageSize int64 = 1024 * 1024 + // Default is for dictionary encoding to be turned on, use WithDictionaryDefault + // writer property to change that. + DefaultDictionaryEnabled = true + // If the dictionary reaches the size of this limitation, the writer will use + // the fallback encoding (usually plain) instead of continuing to build the + // dictionary index. + DefaultDictionaryPageSizeLimit = DefaultDataPageSize + // In order to attempt to facilitate data page size limits for writing, + // data is written in batches. Increasing the batch size may improve performance + // but the larger the batch size, the easier it is to overshoot the datapage limit. + DefaultWriteBatchSize int64 = 1024 + // Default maximum number of rows for a single row group + DefaultMaxRowGroupLen int64 = 64 * 1024 * 1024 + // Default is to have stats enabled for all columns, use writer properties to + // change the default, or to enable/disable for specific columns. + DefaultStatsEnabled = true + // If the stats are larger than 4K the writer will skip writing them out anyways. + DefaultMaxStatsSize int64 = 4096 + DefaultCreatedBy = "parquet-go version 1.0.0" +) + +// ColumnProperties defines the encoding, codec, and so on for a given column. +type ColumnProperties struct { + Encoding Encoding + Codec compress.Compression + DictionaryEnabled bool + StatsEnabled bool + MaxStatsSize int64 + CompressionLevel int +} + +// DefaultColumnProperties returns the default properties which get utilized for writing. +// +// The default column properties are the following constants: +// Encoding: Encodings.Plain +// Codec: compress.Codecs.Uncompressed +// DictionaryEnabled: DefaultDictionaryEnabled +// StatsEnabled: DefaultStatsEnabled +// MaxStatsSize: DefaultMaxStatsSize +// CompressionLevel: compress.DefaultCompressionLevel +func DefaultColumnProperties() ColumnProperties { + return ColumnProperties{ + Encoding: Encodings.Plain, + Codec: compress.Codecs.Uncompressed, + DictionaryEnabled: DefaultDictionaryEnabled, + StatsEnabled: DefaultStatsEnabled, + MaxStatsSize: DefaultMaxStatsSize, + CompressionLevel: compress.DefaultCompressionLevel, + } +} + +type writerPropConfig struct { + wr *WriterProperties + encodings map[string]Encoding + codecs map[string]compress.Compression + compressLevel map[string]int + dictEnabled map[string]bool + statsEnabled map[string]bool +} + +// WriterProperty is used as the options for building a writer properties instance +type WriterProperty func(*writerPropConfig) + +// WithAllocator specifies the writer to use the given allocator +func WithAllocator(mem memory.Allocator) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.wr.mem = mem + } +} + +// WithDictionaryDefault sets the default value for whether to enable dictionary encoding +func WithDictionaryDefault(dict bool) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.wr.defColumnProps.DictionaryEnabled = dict + } +} + +// WithDictionaryFor allows enabling or disabling dictionary encoding for a given column path string +func WithDictionaryFor(path string, dict bool) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.dictEnabled[path] = true + } +} + +// WithDictionaryPath is like WithDictionaryFor, but takes a ColumnPath type +func WithDictionaryPath(path ColumnPath, dict bool) WriterProperty { + return WithDictionaryFor(path.String(), dict) +} + +// WithDictionaryPageSizeLimit is the limit of the dictionary at which the writer +// will fallback to plain encoding instead +func WithDictionaryPageSizeLimit(limit int64) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.wr.dictPagesize = limit + } +} + +// WithBatchSize specifies the number of rows to use for batch writes to columns +func WithBatchSize(batch int64) WriterProperty { + return func(cfg *writerPropConfig) { + cfg.wr.batchSize = batch + } +} + +// WithMaxRowGroupLength specifies the number of rows as the maximum number of rows for a given row group in the writer. +func WithMaxRowGroupLength(nrows int64) WriterProperty { Review comment: not sure if this was mirrored on the C++ implementation intentionally, just wanted to check that this pattern is idiomatic go? ########## File path: go/parquet/compress/brotli.go ########## @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compress + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/andybalholm/brotli" +) + +type brotliCodec struct{} + +func (brotliCodec) NewReader(r io.Reader) io.ReadCloser { + return ioutil.NopCloser(brotli.NewReader(r)) +} + +func (b brotliCodec) EncodeLevel(dst, src []byte, level int) []byte { + if level == DefaultCompressionLevel { + level = brotli.DefaultCompression + } + + maxlen := int(b.CompressBound(int64(len(src)))) + if dst == nil || cap(dst) < maxlen { + dst = make([]byte, 0, maxlen) + } + buf := bytes.NewBuffer(dst[:0]) + w := brotli.NewWriterLevel(buf, level) + _, err := w.Write(src) + if err != nil { + panic(err) + } + if err := w.Close(); err != nil { + panic(err) + } + return buf.Bytes() +} + +func (b brotliCodec) Encode(dst, src []byte) []byte { + return b.EncodeLevel(dst, src, brotli.DefaultCompression) +} + +func (brotliCodec) Decode(dst, src []byte) []byte { + rdr := brotli.NewReader(bytes.NewReader(src)) + if dst != nil { + var ( + sofar = 0 + n = -1 + err error = nil + ) + for n != 0 && err == nil { + n, err = rdr.Read(dst[sofar:]) + sofar += n + } + if err != nil && err != io.EOF { + panic(err) + } + return dst[:sofar] + } + + dst, err := ioutil.ReadAll(rdr) + if err != nil { + panic(err) + } + + return dst +} + +// taken from brotli/enc/encode.c Review comment: i find it curious that there are at least two Go bindings to compression libraries that don't give this out of the box. ########## File path: go/parquet/reader_properties.go ########## @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parquet + +import ( + "bytes" + "io" + + "github.com/apache/arrow/go/arrow/ipc" + "github.com/apache/arrow/go/arrow/memory" + "golang.org/x/xerrors" +) + +// ReaderProperties are used to define how the file reader will handle buffering and allocating buffers +type ReaderProperties struct { + alloc memory.Allocator + // Default buffer size to utilize when reading chunks + BufferSize int64 + // create with NewFileDecryptionProperties if dealing with an encrypted file + FileDecryptProps *FileDecryptionProperties + // If this is set to true, then the reader will use SectionReader to + // just use the read stream when reading data. Otherwise we will buffer + // the data we're going to read into memory first and then read that buffer. + BufferedStreamEnabled bool Review comment: it might pay to add a little more commentary here on why one would choose one over the other. ########## File path: go/parquet/compress/zstd.go ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compress + +import ( + "io" + "sync" + + "github.com/klauspost/compress/zstd" +) + +type zstdCodec struct{} + +type zstdcloser struct { + *zstd.Decoder +} + +var ( + enc *zstd.Encoder + dec *zstd.Decoder + initEncoder sync.Once + initDecoder sync.Once +) + +func getencoder() *zstd.Encoder { + initEncoder.Do(func() { + enc, _ = zstd.NewWriter(nil, zstd.WithZeroFrames(true)) + }) + return enc +} + +func getdecoder() *zstd.Decoder { + initDecoder.Do(func() { + dec, _ = zstd.NewReader(nil) + }) + return dec +} + +func (zstdCodec) Decode(dst, src []byte) []byte { + dst, err := getdecoder().DecodeAll(src, dst[:0]) + if err != nil { + panic(err) + } + return dst +} + +func (z *zstdcloser) Close() error { + z.Decoder.Close() + return nil +} + +func (zstdCodec) NewReader(r io.Reader) io.ReadCloser { + ret, _ := zstd.NewReader(r) + return &zstdcloser{ret} +} + +func (zstdCodec) NewWriter(w io.Writer) io.WriteCloser { + ret, _ := zstd.NewWriter(w) + return ret +} + +func (zstdCodec) NewWriterLevel(w io.Writer, level int) (io.WriteCloser, error) { + var compressLevel zstd.EncoderLevel + if level == DefaultCompressionLevel { + compressLevel = zstd.SpeedDefault + } else { + compressLevel = zstd.EncoderLevelFromZstd(level) + } + return zstd.NewWriter(w, zstd.WithEncoderLevel(compressLevel)) +} + +func (z zstdCodec) Encode(dst, src []byte) []byte { + return getencoder().EncodeAll(src, dst[:0]) +} + +func (z zstdCodec) EncodeLevel(dst, src []byte, level int) []byte { + compressLevel := zstd.EncoderLevelFromZstd(level) + if level == DefaultCompressionLevel { + compressLevel = zstd.SpeedDefault + } + enc, _ := zstd.NewWriter(nil, zstd.WithZeroFrames(true), zstd.WithEncoderLevel(compressLevel)) + return enc.EncodeAll(src, dst[:0]) +} + +// from zstd.h Review comment: function name is useful, here too ########## File path: go/parquet/compress/compress.go ########## @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package compress contains the interfaces and implementations for handling compression/decompression +// of parquet data at the column levels. +package compress + +import ( + "compress/flate" + "io" + "io/ioutil" + + "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// Compression is an alias to the thrift compression codec enum type for easy use +type Compression parquet.CompressionCodec + +func (c Compression) String() string { + return parquet.CompressionCodec(c).String() +} + +// DefaultCompressionLevel will use flate.DefaultCompression since many of the compression libraries +// use that to denote "use the default". +const DefaultCompressionLevel = flate.DefaultCompression + +// Codecs is a useful struct to provide namespaced enum values to use for specifying the compression type to use +// which make for easy internal swapping between them and the thrift enum since they are initialized to the same +// constant values. +var Codecs = struct { + Uncompressed Compression + Snappy Compression + Gzip Compression + // LZO is unsupported in this library as I haven't yet found a good implementation of it for golang. + Lzo Compression + Brotli Compression + // LZ4 unsupported in this library due to problematic issues between the Hadoop LZ4 spec vs regular lz4 + // see: http://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3ccaari41v24xua8mghldvgsne+7aagohieukemw_opnhmvfmm...@mail.gmail.com%3E + Lz4 Compression + Zstd Compression +}{ + Uncompressed: Compression(parquet.CompressionCodec_UNCOMPRESSED), + Snappy: Compression(parquet.CompressionCodec_SNAPPY), + Gzip: Compression(parquet.CompressionCodec_GZIP), + Lzo: Compression(parquet.CompressionCodec_LZO), + Brotli: Compression(parquet.CompressionCodec_BROTLI), + Lz4: Compression(parquet.CompressionCodec_LZ4), + Zstd: Compression(parquet.CompressionCodec_ZSTD), +} + +// Codec is an interface which is implemented for each compression type in order to make the interactions easy to +// implement. Most consumers won't be calling GetCodec directly. +type Codec interface { + // NewReader provides a reader that wraps a stream with compressed data to stream the uncompressed data + NewReader(io.Reader) io.ReadCloser + // NewWriter provides a wrapper around a write stream to compress data before writing it. + NewWriter(io.Writer) io.WriteCloser + // NewWriterLevel is like NewWrapper but allows specifying the compression level + NewWriterLevel(io.Writer, int) (io.WriteCloser, error) + // Encode encodes a block of data given by src and returns the compressed block. dst needs to be either nil + // or sized large enough to fit the compressed block (use CompressBound to allocate). dst and src should not + // overlap since some of the compression types don't allow it. + // + // The returned slice *might* be a slice of dst if it was able to fit the whole compressed data in it. Review comment: nit: this seems to disagree with the comment above. It would imply that a third option is DST will be used if it sized appropriately but otherwise new memory would be allocated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org