zeroshade commented on a change in pull request #10379: URL: https://github.com/apache/arrow/pull/10379#discussion_r647566763
########## File path: go/parquet/internal/encoding/decoder.go ########## @@ -0,0 +1,178 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package encoding + +import ( + "bytes" + "reflect" + + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/arrow/go/parquet/internal/utils" + "github.com/apache/arrow/go/parquet/schema" +) + +// DecoderTraits provides an interface for more easily interacting with types +// to generate decoders for specific types. +type DecoderTraits interface { + Decoder(parquet.Encoding, *schema.Column, bool, memory.Allocator) TypedDecoder + BytesRequired(int) int +} + +// NewDecoder constructs a decoder for a given type and encoding +func NewDecoder(t parquet.Type, e parquet.Encoding, descr *schema.Column, mem memory.Allocator) TypedDecoder { + traits := getDecodingTraits(t) + if traits == nil { + return nil + } + + return traits.Decoder(e, descr, false, mem) +} + +// NewDictDecoder is like NewDecoder but for dictionary encodings, panics if type is bool. +// +// if mem is nil, memory.DefaultAllocator will be used +func NewDictDecoder(t parquet.Type, descr *schema.Column, mem memory.Allocator) DictDecoder { + traits := getDecodingTraits(t) + if traits == nil { + return nil + } + + if mem == nil { + mem = memory.DefaultAllocator + } + + return traits.Decoder(parquet.Encodings.RLEDict, descr, true, mem).(DictDecoder) +} + +type decoder struct { + descr *schema.Column + encoding format.Encoding + nvals int + data []byte + typeLen int +} + +// newDecoderBase constructs the base decoding object that is embedded in the +// type specific decoders. +func newDecoderBase(e format.Encoding, descr *schema.Column) decoder { + typeLen := -1 + if descr != nil && descr.PhysicalType() == parquet.Types.FixedLenByteArray { + typeLen = int(descr.TypeLength()) + } + + return decoder{ + descr: descr, + encoding: e, + typeLen: typeLen, + } +} + +// SetData sets the data for decoding into the decoder to update the available +// data bytes and number of values available. +func (d *decoder) SetData(nvals int, data []byte) { + d.data = data + d.nvals = nvals +} + +// ValuesLeft returns the number of remaining values that can be decoded +func (d *decoder) ValuesLeft() int { return d.nvals } + +// Encoding returns the encoding type used by this decoder to decode the bytes. +func (d *decoder) Encoding() parquet.Encoding { return parquet.Encoding(d.encoding) } + +type dictDecoder struct { + decoder + mem memory.Allocator + dictValueDecoder utils.DictionaryConverter + idxDecoder *utils.RleDecoder +} + +// SetDict sets a decoder that can be used to decode the dictionary that is +// used for this column in order to return the proper values. +func (d *dictDecoder) SetDict(dict TypedDecoder) { + if dict.Type() != d.descr.PhysicalType() { + panic("parquet: mismatch dictionary and column data type") + } + + d.dictValueDecoder = NewDictConverter(dict) +} + +// SetData sets the index value data into the decoder. +func (d *dictDecoder) SetData(nvals int, data []byte) { + d.nvals = nvals + if len(data) == 0 { + d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data), 1) + return + } + + width := uint8(data[0]) + if width >= 64 { + panic("parquet: invalid or corrupted bit width") + } + + d.idxDecoder = utils.NewRleDecoder(bytes.NewReader(data[1:]), int(width)) +} + +func (d *dictDecoder) decode(out interface{}) (int, error) { + return d.idxDecoder.GetBatchWithDict(d.dictValueDecoder, out) +} + +func (d *dictDecoder) decodeSpaced(out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { + return d.idxDecoder.GetBatchWithDictSpaced(d.dictValueDecoder, out, nullCount, validBits, validBitsOffset) +} + +var empty = [1]byte{0} + +// spacedExpand is used to take a slice of data and utilize the bitmap provided to fill in nulls into the +// correct slots according to the bitmap in order to produce a fully expanded result slice with nulls +// in the correct slots. +func spacedExpand(buffer interface{}, nullCount int, validBits []byte, validBitsOffset int64) int { + bufferRef := reflect.ValueOf(buffer) + if bufferRef.Kind() != reflect.Slice { + panic("invalid spacedexpand type, not slice") + } + + var ( + numValues int = bufferRef.Len() + ) + + idxDecode := int32(numValues - nullCount) + if idxDecode == 0 { // if there's nothing to decode there's nothing to do. + return numValues + } + + // read the bitmap in reverse grabbing runs of valid bits where possible. + rdr := utils.NewReverseSetBitRunReader(validBits, validBitsOffset, int64(numValues)) + for { + run := rdr.NextRun() + if run.Length == 0 { + break + } + + // copy data from the end of the slice to it's proper location in the slice after accounting for the nulls + // because we technically don't care what is in the null slots we don't actually have to clean + // up after ourselves because we're doing this in reverse to guarantee that we'll always simply + // overwrite any existing data with the correctly spaced data. Any data that happens to be left in the null + // slots is fine since it shouldn't matter and saves us work. + idxDecode -= int32(run.Length) Review comment: total length? or just the run.Length? The total length, ie: `bufferRef.Len()` is a 64-bit int on 64-bit architectures above since it's using `int`, but I'd actually rather have `idxDecode` be an `int64` than add a comparison against int32 max in this loop as this is a low level tight loop so adding that comparison would actually affect performance more than just changing this to be an `int64` to avoid the potential for int32 overflow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
