This is an automated email from the ASF dual-hosted git repository. zeroshade pushed a commit to branch temp-parquet-pqarrow in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 879ca48f1a7ad0da6015a09c32eab64545fa77e9 Author: Matthew Topol <[email protected]> AuthorDate: Tue Nov 9 13:25:39 2021 -0500 update to latest arrow changes, fix issues --- go/parquet/file/level_conversion.go | 46 +- go/parquet/file/record_reader.go | 799 ++++++++++++++++++++++++++++++++ go/parquet/go.mod | 4 +- go/parquet/go.sum | 11 +- go/parquet/pqarrow/encode_arrow.go | 42 +- go/parquet/pqarrow/encode_arrow_test.go | 38 +- go/parquet/pqarrow/file_writer.go | 8 +- go/parquet/pqarrow/path_builder.go | 2 +- go/parquet/pqarrow/schema.go | 2 +- 9 files changed, 882 insertions(+), 70 deletions(-) diff --git a/go/parquet/file/level_conversion.go b/go/parquet/file/level_conversion.go index 5d55848..381bdbc 100644 --- a/go/parquet/file/level_conversion.go +++ b/go/parquet/file/level_conversion.go @@ -19,6 +19,7 @@ package file import ( "math" "math/bits" + "unsafe" "github.com/apache/arrow/go/parquet" "github.com/apache/arrow/go/parquet/internal/bmi" @@ -136,28 +137,37 @@ type ValidityBitmapInputOutput struct { } // create a bitmap out of the definition Levels and return the number of non-null values -func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) uint64 { - definedBitmap := bmi.GreaterThanBitmap(defLevels, info.DefLevel-1) +func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) (count uint64) { + const maxbatch = 8 * int(unsafe.Sizeof(uint64(0))) - if hasRepeatedParent { - // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the - // repeated_ancestor_def_level - presentBitmap := bmi.GreaterThanBitmap(defLevels, info.RepeatedAncestorDefLevel-1) - selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap) - selectedCount := int64(bits.OnesCount64(presentBitmap)) - if selectedCount > remainingUpperBound { - panic("values read exceeded upper bound") + var batch []int16 + for len(defLevels) > 0 { + batchSize := utils.MinInt(maxbatch, len(defLevels)) + batch, defLevels = defLevels[:batchSize], defLevels[batchSize:] + definedBitmap := bmi.GreaterThanBitmap(batch, info.DefLevel-1) + + if hasRepeatedParent { + // Greater than level_info.repeated_ancestor_def_level - 1 implies >= the + // repeated_ancestor_def_level + presentBitmap := bmi.GreaterThanBitmap(batch, info.RepeatedAncestorDefLevel-1) + selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap) + selectedCount := int64(bits.OnesCount64(presentBitmap)) + if selectedCount > remainingUpperBound { + panic("values read exceeded upper bound") + } + wr.AppendWord(selectedBits, selectedCount) + count += uint64(bits.OnesCount64(selectedBits)) + continue } - wr.AppendWord(selectedBits, selectedCount) - return uint64(bits.OnesCount64(selectedBits)) - } - if int64(len(defLevels)) > remainingUpperBound { - panic("values read exceed upper bound") - } + if int64(len(defLevels)) > remainingUpperBound { + panic("values read exceed upper bound") + } - wr.AppendWord(definedBitmap, int64(len(defLevels))) - return uint64(bits.OnesCount64(definedBitmap)) + wr.AppendWord(definedBitmap, int64(len(batch))) + count += uint64(bits.OnesCount64(definedBitmap)) + } + return } // create a bitmap out of the definition Levels diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go new file mode 100644 index 0000000..8f92512 --- /dev/null +++ b/go/parquet/file/record_reader.go @@ -0,0 +1,799 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file + +import ( + "sync/atomic" + "unsafe" + + "github.com/JohnCGriffin/overflow" + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/bitutil" + "github.com/apache/arrow/go/arrow/memory" + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/internal/encoding" + "github.com/apache/arrow/go/parquet/internal/utils" + "github.com/apache/arrow/go/parquet/schema" + "golang.org/x/xerrors" +) + +type RecordReader interface { + DefLevels() []int16 + LevelsPos() int64 + RepLevels() []int16 + Reset() + Reserve(int64) error + HasMore() bool + ReadRecords(num int64) (int64, error) + ValuesWritten() int + ReleaseValidBits() *memory.Buffer + ReleaseValues() *memory.Buffer + NullCount() int64 + Type() parquet.Type + Values() []byte + SetPageReader(PageReader) + Retain() + Release() +} + +type BinaryRecordReader interface { + RecordReader + GetBuilderChunks() []array.Interface +} + +type recordReaderImpl interface { + ColumnChunkReader + ReadValuesDense(int64) error + ReadValuesSpaced(int64, int64) error + ReserveValues(int64, bool) error + ResetValues() + GetValidBits() []byte + IncrementWritten(int64, int64) + ValuesWritten() int64 + ReleaseValidBits() *memory.Buffer + ReleaseValues() *memory.Buffer + NullCount() int64 + Values() []byte + SetPageReader(PageReader) + Retain() + Release() +} + +type binaryRecordReaderImpl interface { + recordReaderImpl + GetBuilderChunks() []array.Interface +} + +type primitiveRecordReader struct { + ColumnChunkReader + + valuesWritten int64 + valuesCap int64 + nullCount int64 + values *memory.Buffer + validBits *memory.Buffer + mem memory.Allocator + + refCount int64 + useValues bool +} + +func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) primitiveRecordReader { + return primitiveRecordReader{ + ColumnChunkReader: NewColumnReader(descr, nil, mem), + values: memory.NewResizableBuffer(mem), + validBits: memory.NewResizableBuffer(mem), + mem: mem, + refCount: 1, + useValues: descr.PhysicalType() != parquet.Types.ByteArray && descr.PhysicalType() != parquet.Types.FixedLenByteArray, + } +} + +func (pr *primitiveRecordReader) Retain() { + atomic.AddInt64(&pr.refCount, 1) +} + +func (pr *primitiveRecordReader) Release() { + if atomic.AddInt64(&pr.refCount, -1) == 0 { + if pr.values != nil { + pr.values.Release() + pr.values = nil + } + if pr.validBits != nil { + pr.validBits.Release() + pr.validBits = nil + } + } +} + +func (pr *primitiveRecordReader) SetPageReader(rdr PageReader) { + pr.ColumnChunkReader.setPageReader(rdr) +} + +func (pr *primitiveRecordReader) ReleaseValidBits() *memory.Buffer { + res := pr.validBits + res.Resize(int(bitutil.BytesForBits(pr.valuesWritten))) + pr.validBits = memory.NewResizableBuffer(pr.mem) + return res +} + +func (pr *primitiveRecordReader) ReleaseValues() (res *memory.Buffer) { + res = pr.values + nbytes, err := pr.numBytesForValues(pr.valuesWritten) + if err != nil { + panic(err) + } + res.Resize(int(nbytes)) + pr.values = memory.NewResizableBuffer(pr.mem) + pr.valuesCap = 0 + + return +} + +func (pr *primitiveRecordReader) NullCount() int64 { return pr.nullCount } + +func (pr *primitiveRecordReader) IncrementWritten(w, n int64) { + pr.valuesWritten += w + pr.nullCount += n +} +func (pr *primitiveRecordReader) GetValidBits() []byte { return pr.validBits.Bytes() } +func (pr *primitiveRecordReader) ValuesWritten() int64 { return pr.valuesWritten } +func (pr *primitiveRecordReader) Values() []byte { return pr.values.Bytes() } +func (pr *primitiveRecordReader) ResetValues() { + if pr.valuesWritten > 0 { + pr.values.ResizeNoShrink(0) + pr.validBits.ResizeNoShrink(0) + pr.valuesWritten = 0 + pr.valuesCap = 0 + pr.nullCount = 0 + } +} + +func (pr *primitiveRecordReader) numBytesForValues(nitems int64) (num int64, err error) { + typeSize := int64(pr.Descriptor().PhysicalType().ByteSize()) + var ok bool + if num, ok = overflow.Mul64(nitems, typeSize); !ok { + err = xerrors.New("total size of items too large") + } + return +} + +func (pr *primitiveRecordReader) ReserveValues(extra int64, hasNullable bool) error { + newCap, err := updateCapacity(pr.valuesCap, pr.valuesWritten, extra) + if err != nil { + return err + } + if newCap > pr.valuesCap { + capBytes, err := pr.numBytesForValues(newCap) + if err != nil { + return err + } + if pr.useValues { + pr.values.ResizeNoShrink(int(capBytes)) + } + pr.valuesCap = newCap + } + if hasNullable { + validBytesCap := bitutil.BytesForBits(pr.valuesCap) + if pr.validBits.Len() < int(validBytesCap) { + pr.validBits.ResizeNoShrink(int(validBytesCap)) + } + } + return nil +} + +func (pr *primitiveRecordReader) ReadValuesDense(toRead int64) (err error) { + switch cr := pr.ColumnChunkReader.(type) { + case *BooleanColumnChunkReader: + data := pr.values.Bytes()[int(pr.valuesWritten):] + values := *(*[]bool)(unsafe.Pointer(&data)) + _, err = cr.curDecoder.(encoding.BooleanDecoder).Decode(values[:toRead]) + case *Int32ColumnChunkReader: + values := arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int32Decoder).Decode(values[:toRead]) + case *Int64ColumnChunkReader: + values := arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int64Decoder).Decode(values[:toRead]) + case *Int96ColumnChunkReader: + values := parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int96Decoder).Decode(values[:toRead]) + case *ByteArrayColumnChunkReader: + values := parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[:toRead]) + case *FixedLenByteArrayColumnChunkReader: + values := parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[:toRead]) + case *Float32ColumnChunkReader: + values := arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Float32Decoder).Decode(values[:toRead]) + case *Float64ColumnChunkReader: + values := arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Float64Decoder).Decode(values[:toRead]) + default: + panic("invalid type for record reader") + } + return +} + +func (pr *primitiveRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) (err error) { + validBits := pr.validBits.Bytes() + offset := pr.valuesWritten + + switch cr := pr.ColumnChunkReader.(type) { + case *BooleanColumnChunkReader: + data := pr.values.Bytes()[int(pr.valuesWritten):] + values := *(*[]bool)(unsafe.Pointer(&data)) + _, err = cr.curDecoder.(encoding.BooleanDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *Int32ColumnChunkReader: + values := arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int32Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *Int64ColumnChunkReader: + values := arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int64Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *Int96ColumnChunkReader: + values := parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Int96Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *ByteArrayColumnChunkReader: + values := parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.ByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *FixedLenByteArrayColumnChunkReader: + values := parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *Float32ColumnChunkReader: + values := arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Float32Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + case *Float64ColumnChunkReader: + values := arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):] + _, err = cr.curDecoder.(encoding.Float64Decoder).DecodeSpaced(values[:int(valuesWithNulls)], int(nullCount), validBits, offset) + default: + panic("invalid type for record reader") + } + return +} + +type recordReader struct { + recordReaderImpl + leafInfo LevelInfo + + nullable bool + atRecStart bool + recordsRead int64 + + levelsWritten int64 + levelsPos int64 + levelsCap int64 + + defLevels *memory.Buffer + repLevels *memory.Buffer + + readDict bool +} + +type binaryRecordReader struct { + *recordReader +} + +func (b *binaryRecordReader) GetBuilderChunks() []array.Interface { + return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks() +} + +func newRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { + if mem == nil { + mem = memory.DefaultAllocator + } + + pr := createPrimitiveRecordReader(descr, mem) + return &recordReader{ + recordReaderImpl: &pr, + leafInfo: info, + defLevels: memory.NewResizableBuffer(mem), + repLevels: memory.NewResizableBuffer(mem), + } +} + +func (rr *recordReader) DefLevels() []int16 { + return arrow.Int16Traits.CastFromBytes(rr.defLevels.Bytes()) +} + +func (rr *recordReader) RepLevels() []int16 { + return arrow.Int16Traits.CastFromBytes(rr.repLevels.Bytes()) +} + +func (rr *recordReader) HasMore() bool { + return rr.pager() != nil +} + +func (rr *recordReader) SetPageReader(pr PageReader) { + rr.atRecStart = true + rr.recordReaderImpl.SetPageReader(pr) +} + +func (rr *recordReader) ValuesWritten() int { + return int(rr.recordReaderImpl.ValuesWritten()) +} + +func (rr *recordReader) LevelsPos() int64 { return rr.levelsPos } + +func updateCapacity(cap, size, extra int64) (int64, error) { + if extra < 0 { + return 0, xerrors.New("negative size (corrupt file?)") + } + target, ok := overflow.Add64(size, extra) + if !ok { + return 0, xerrors.New("allocation size too large (corrupt file?)") + } + if target >= (1 << 62) { + return 0, xerrors.New("allocation size too large (corrupt file?)") + } + if cap >= target { + return cap, nil + } + return int64(bitutil.NextPowerOf2(int(target))), nil +} + +func (rr *recordReader) Reserve(cap int64) error { + if err := rr.reserveLevels(cap); err != nil { + return err + } + if err := rr.reserveValues(cap); err != nil { + return err + } + return nil +} + +func (rr *recordReader) reserveLevels(extra int64) error { + if rr.Descriptor().MaxDefinitionLevel() > 0 { + newCap, err := updateCapacity(rr.levelsCap, rr.levelsWritten, extra) + if err != nil { + return err + } + + if newCap > rr.levelsCap { + capBytes, ok := overflow.Mul(int(newCap), arrow.Int16SizeBytes) + if !ok { + return xerrors.Errorf("allocation size too large (corrupt file?)") + } + rr.defLevels.ResizeNoShrink(capBytes) + if rr.Descriptor().MaxRepetitionLevel() > 0 { + rr.repLevels.ResizeNoShrink(capBytes) + } + rr.levelsCap = newCap + } + } + return nil +} + +func (rr *recordReader) reserveValues(extra int64) error { + return rr.recordReaderImpl.ReserveValues(extra, rr.leafInfo.HasNullableValues()) +} + +func (rr *recordReader) resetValues() { + rr.recordReaderImpl.ResetValues() +} + +func (rr *recordReader) Reset() { + rr.resetValues() + + if rr.levelsWritten > 0 { + remain := int(rr.levelsWritten - rr.levelsPos) + // shift remaining levels to beginning of buffer and trim only the + // number decoded remaining + defData := rr.DefLevels() + + copy(defData, defData[int(rr.levelsPos):int(rr.levelsWritten)]) + rr.defLevels.ResizeNoShrink(remain * int(arrow.Int16SizeBytes)) + + if rr.Descriptor().MaxRepetitionLevel() > 0 { + repData := rr.RepLevels() + copy(repData, repData[int(rr.levelsPos):int(rr.levelsWritten)]) + rr.repLevels.ResizeNoShrink(remain * int(arrow.Int16SizeBytes)) + } + + rr.levelsWritten -= rr.levelsPos + rr.levelsPos = 0 + rr.levelsCap = int64(remain) + } + + rr.recordsRead = 0 +} + +// process written rep/def levels to read the end of records +// process no more levels than necessary to delimit the indicated +// number of logical records. updates internal state of recordreader +// returns number of records delimited +func (rr *recordReader) delimitRecords(numRecords int64) (recordsRead, valsToRead int64) { + var ( + curRep int16 + curDef int16 + ) + + defLevels := rr.DefLevels()[int(rr.levelsPos):] + repLevels := rr.RepLevels()[int(rr.levelsPos):] + + for rr.levelsPos < rr.levelsWritten { + curRep, repLevels = repLevels[0], repLevels[1:] + if curRep == 0 { + // if at record start, we are seeing the start of a record + // for the second time, such as after repeated calls to delimitrecords. + // in this case we must continue until we find another record start + // or exaust the column chunk + if !rr.atRecStart { + // end of a record, increment count + recordsRead++ + if recordsRead == numRecords { + // found the number of records we wanted, set record start to true and break + rr.atRecStart = true + break + } + } + } + // we have decided to consume the level at this position + // advance until we find another boundary + rr.atRecStart = false + + curDef, defLevels = defLevels[0], defLevels[1:] + if curDef == rr.Descriptor().MaxDefinitionLevel() { + valsToRead++ + } + rr.levelsPos++ + } + return +} + +func (rr *recordReader) ReadRecordData(numRecords int64) (int64, error) { + possibleNum := utils.Max(numRecords, rr.levelsWritten-rr.levelsPos) + if err := rr.reserveValues(possibleNum); err != nil { + return 0, err + } + + var ( + startPos = rr.levelsPos + valuesToRead int64 + recordsRead int64 + nullCount int64 + err error + ) + + if rr.Descriptor().MaxRepetitionLevel() > 0 { + recordsRead, valuesToRead = rr.delimitRecords(numRecords) + } else if rr.Descriptor().MaxDefinitionLevel() > 0 { + // no repetition levels, skip delimiting logic. each level + // represents null or not null entry + recordsRead = utils.Min(rr.levelsWritten-rr.levelsPos, numRecords) + // this is advanced by delimitRecords which we skipped + rr.levelsPos += recordsRead + } else { + recordsRead, valuesToRead = numRecords, numRecords + } + + if rr.leafInfo.HasNullableValues() { + validityIO := ValidityBitmapInputOutput{ + ReadUpperBound: rr.levelsPos - startPos, + ValidBits: rr.GetValidBits(), + ValidBitsOffset: rr.recordReaderImpl.ValuesWritten(), + } + DefLevelsToBitmap(rr.DefLevels()[startPos:int(rr.levelsPos)], rr.leafInfo, &validityIO) + valuesToRead = validityIO.Read - validityIO.NullCount + nullCount = validityIO.NullCount + err = rr.ReadValuesSpaced(validityIO.Read, nullCount) + } else { + err = rr.ReadValuesDense(valuesToRead) + } + if err != nil { + return 0, err + } + + if rr.leafInfo.DefLevel > 0 { + rr.consumeBufferedValues(rr.levelsPos - startPos) + } else { + rr.consumeBufferedValues(valuesToRead) + } + + // total values, including nullspaces if any + rr.IncrementWritten(valuesToRead+nullCount, nullCount) + return recordsRead, nil +} + +const minLevelBatchSize = 1024 + +func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) { + // delimit records, then read values at the end + recordsRead := int64(0) + + if rr.levelsPos < rr.levelsWritten { + additional, err := rr.ReadRecordData(numRecords) + if err != nil { + return 0, err + } + recordsRead += additional + } + + levelBatch := utils.Max(minLevelBatchSize, numRecords) + + // if we are in the middle of a record, continue until reaching + // the desired number of records or the end of the current record + // if we have enough + for !rr.atRecStart || recordsRead < numRecords { + // is there more data in this row group? + if !rr.HasNext() { + if !rr.atRecStart { + // ended the row group while inside a record we haven't seen + // the end of yet. increment the record count for the last record + // in the row group + recordsRead++ + rr.atRecStart = true + } + break + } + + // we perform multiple batch reads until we either exhaust the row group + // or observe the desired number of records + batchSize := utils.Min(levelBatch, rr.numAvailValues()) + if batchSize == 0 { + // no more data in column + break + } + + if rr.Descriptor().MaxDefinitionLevel() > 0 { + if err := rr.reserveLevels(batchSize); err != nil { + return 0, err + } + + defLevels := rr.DefLevels()[int(rr.levelsWritten):] + + levelsRead := 0 + // not present for non-repeated fields + if rr.Descriptor().MaxRepetitionLevel() > 0 { + repLevels := rr.RepLevels()[int(rr.levelsWritten):] + levelsRead, _ = rr.readDefinitionLevels(defLevels[:batchSize]) + if rr.readRepetitionLevels(repLevels[:batchSize]) != levelsRead { + return 0, xerrors.New("number of decoded rep/def levels did not match") + } + } else if rr.Descriptor().MaxDefinitionLevel() > 0 { + levelsRead, _ = rr.readDefinitionLevels(defLevels[:batchSize]) + } + + if levelsRead == 0 { + // exhausted column chunk + break + } + + rr.levelsWritten += int64(levelsRead) + read, err := rr.ReadRecordData(numRecords - recordsRead) + if err != nil { + return recordsRead, err + } + recordsRead += read + } else { + // no rep or def levels + batchSize = utils.Min(numRecords-recordsRead, batchSize) + read, err := rr.ReadRecordData(batchSize) + if err != nil { + return recordsRead, err + } + recordsRead += read + } + } + + return recordsRead, nil +} + +func (rr *recordReader) ReleaseValidBits() *memory.Buffer { + if rr.leafInfo.HasNullableValues() { + return rr.recordReaderImpl.ReleaseValidBits() + } + return nil +} + +type flbaRecordReader struct { + primitiveRecordReader + + bldr *array.FixedSizeBinaryBuilder + valueBuf []parquet.FixedLenByteArray +} + +func (fr *flbaRecordReader) ReserveValues(extra int64, hasNullable bool) error { + fr.bldr.Reserve(int(extra)) + return fr.primitiveRecordReader.ReserveValues(extra, hasNullable) +} + +func (fr *flbaRecordReader) Retain() { + fr.bldr.Retain() + fr.primitiveRecordReader.Retain() +} + +func (fr *flbaRecordReader) Release() { + fr.bldr.Release() + fr.primitiveRecordReader.Release() +} + +func (fr *flbaRecordReader) ReadValuesDense(toRead int64) error { + if int64(cap(fr.valueBuf)) < toRead { + fr.valueBuf = make([]parquet.FixedLenByteArray, 0, toRead) + } + + values := fr.valueBuf[:toRead] + dec := fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder) + + _, err := dec.Decode(values) + if err != nil { + return err + } + + for _, val := range values { + fr.bldr.Append(val) + } + fr.ResetValues() + return nil +} + +func (fr *flbaRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) error { + validBits := fr.validBits.Bytes() + offset := fr.valuesWritten + + if int64(cap(fr.valueBuf)) < valuesWithNulls { + fr.valueBuf = make([]parquet.FixedLenByteArray, 0, valuesWithNulls) + } + + values := fr.valueBuf[:valuesWithNulls] + dec := fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder) + _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset) + if err != nil { + return err + } + + for idx, val := range values { + if bitutil.BitIsSet(validBits, int(offset)+idx) { + fr.bldr.Append(val) + } else { + fr.bldr.AppendNull() + } + } + fr.ResetValues() + return nil +} + +func (fr *flbaRecordReader) GetBuilderChunks() []array.Interface { + return []array.Interface{fr.bldr.NewArray()} +} + +func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { + if mem == nil { + mem = memory.DefaultAllocator + } + + byteWidth := descr.TypeLength() + + return &binaryRecordReader{&recordReader{ + recordReaderImpl: &flbaRecordReader{ + createPrimitiveRecordReader(descr, mem), + array.NewFixedSizeBinaryBuilder(mem, &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}), + nil, + }, + leafInfo: info, + defLevels: memory.NewResizableBuffer(mem), + repLevels: memory.NewResizableBuffer(mem), + }} +} + +type byteArrayRecordReader struct { + primitiveRecordReader + + bldr *array.BinaryBuilder + valueBuf []parquet.ByteArray +} + +func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem memory.Allocator) RecordReader { + if mem == nil { + mem = memory.DefaultAllocator + } + + dt := arrow.BinaryTypes.Binary + if descr.LogicalType().Equals(schema.StringLogicalType{}) { + dt = arrow.BinaryTypes.String + } + + return &binaryRecordReader{&recordReader{ + recordReaderImpl: &byteArrayRecordReader{ + createPrimitiveRecordReader(descr, mem), + array.NewBinaryBuilder(mem, dt), + nil, + }, + leafInfo: info, + defLevels: memory.NewResizableBuffer(mem), + repLevels: memory.NewResizableBuffer(mem), + }} +} + +func (fr *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool) error { + fr.bldr.Reserve(int(extra)) + return fr.primitiveRecordReader.ReserveValues(extra, hasNullable) +} + +func (fr *byteArrayRecordReader) Retain() { + fr.bldr.Retain() + fr.primitiveRecordReader.Retain() +} + +func (fr *byteArrayRecordReader) Release() { + fr.bldr.Release() + fr.primitiveRecordReader.Release() +} + +func (br *byteArrayRecordReader) ReadValuesDense(toRead int64) error { + if int64(cap(br.valueBuf)) < toRead { + br.valueBuf = make([]parquet.ByteArray, 0, toRead) + } + + values := br.valueBuf[:toRead] + dec := br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder) + + _, err := dec.Decode(values) + if err != nil { + return err + } + + for _, val := range values { + br.bldr.Append(val) + } + br.ResetValues() + return nil +} + +func (br *byteArrayRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) error { + validBits := br.validBits.Bytes() + offset := br.valuesWritten + + if int64(cap(br.valueBuf)) < valuesWithNulls { + br.valueBuf = make([]parquet.ByteArray, 0, valuesWithNulls) + } + + values := br.valueBuf[:valuesWithNulls] + dec := br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder) + _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset) + if err != nil { + return err + } + + for idx, val := range values { + if bitutil.BitIsSet(validBits, int(offset)+idx) { + br.bldr.Append(val) + } else { + br.bldr.AppendNull() + } + } + br.ResetValues() + return nil +} + +func (br *byteArrayRecordReader) GetBuilderChunks() []array.Interface { + return []array.Interface{br.bldr.NewArray()} +} + +// TODO(mtopol): create optimized readers for dictionary types after ARROW-7286 is done + +func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem memory.Allocator) RecordReader { + switch descr.PhysicalType() { + case parquet.Types.ByteArray: + return newByteArrayRecordReader(descr, info, mem) + case parquet.Types.FixedLenByteArray: + return newFLBARecordReader(descr, info, mem) + default: + return newRecordReader(descr, info, mem) + } +} diff --git a/go/parquet/go.mod b/go/parquet/go.mod index d1e28dd..25f3f64 100644 --- a/go/parquet/go.mod +++ b/go/parquet/go.mod @@ -21,7 +21,7 @@ go 1.15 require ( github.com/JohnCGriffin/overflow v0.0.0-20170615021017-4d914c927216 github.com/andybalholm/brotli v1.0.3 - github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb + github.com/apache/arrow/go/arrow v0.0.0-20211109162857-ed8c76e74346 github.com/apache/thrift v0.15.0 github.com/golang/snappy v0.0.3 github.com/klauspost/asmfmt v1.2.3 @@ -36,3 +36,5 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 gonum.org/v1/gonum v0.9.3 ) + +replace github.com/apache/arrow/go/arrow => github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7 diff --git a/go/parquet/go.sum b/go/parquet/go.sum index 1cdcc47..68a9fd2 100644 --- a/go/parquet/go.sum +++ b/go/parquet/go.sum @@ -11,8 +11,6 @@ github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3 github.com/andybalholm/brotli v1.0.3 h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM= github.com/andybalholm/brotli v1.0.3/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb h1:4t4siO1kRtmrdKcOKXxZvtFpCP/bJQW7LA3qABUhdEY= -github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= github.com/apache/thrift v0.15.0 h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y= github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= @@ -57,6 +55,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -68,6 +67,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= @@ -101,6 +101,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zeebo/xxh3 v0.10.0 h1:1+2Mov9zfxTNUeoDG9k9i13VfxTR0p1JQu8L0vikxB0= github.com/zeebo/xxh3 v0.10.0/go.mod h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg= +github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7 h1:JjJo15d72a2bpc254UpkztI5X62iDEmdfjECOd5jCdQ= +github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7/go.mod h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -147,6 +149,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -173,6 +176,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9sn golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -208,6 +212,7 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79 h1:s1jFTXJryg4a1mew7xv03VZD8N9XjxFhk1o4Js4WvPQ= google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= @@ -216,6 +221,7 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -228,6 +234,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 65bee37..5d11718 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -35,7 +35,7 @@ import ( func calcLeafCount(dt arrow.DataType) int { switch dt.ID() { - case arrow.EXTENSION, arrow.UNION: + case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION: panic("arrow type not implemented") case arrow.LIST: return calcLeafCount(dt.(*arrow.ListType).Elem()) @@ -158,7 +158,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context) error { arrCtx := arrowCtxFromContext(ctx) for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ { var ( - cw file.ColumnWriter + cw file.ColumnChunkWriter err error ) @@ -203,7 +203,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context) error { // leafArr is always a primitive (possibly dictionary encoded type). // Leaf_field_nullable indicates whether the leaf array is considered nullable // according to its schema in a Table or its parent array. -func WriteArrowToColumn(ctx context.Context, cw file.ColumnWriter, leafArr array.Interface, defLevels, repLevels []int16, leafFieldNullable bool) error { +func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, leafArr array.Interface, defLevels, repLevels []int16, leafFieldNullable bool) error { // Leaf nulls are canonical when there is only a single null element after a list // and it is at the leaf. colLevelInfo := cw.LevelInfo() @@ -223,10 +223,11 @@ func WriteArrowToColumn(ctx context.Context, cw file.ColumnWriter, leafArr array } type binaryarr interface { + ValueBytes() []byte ValueOffsets() []int32 } -func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array.Interface, defLevels, repLevels []int16, maybeParentNulls bool) (err error) { +func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr array.Interface, defLevels, repLevels []int16, maybeParentNulls bool) (err error) { noNulls := cw.Descr().SchemaNode().RepetitionType() == parquet.Repetitions.Required || leafArr.NullN() == 0 if ctx.dataBuffer == nil { @@ -234,7 +235,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array } switch wr := cw.(type) { - case *file.BooleanColumnWriter: + case *file.BooleanColumnChunkWriter: if leafArr.DataType().ID() != arrow.BOOL { return xerrors.Errorf("type mismatch, column is %s, array is %s", cw.Type(), leafArr.DataType().ID()) } @@ -255,7 +256,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array } else { wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) } - case *file.Int32ColumnWriter: + case *file.Int32ColumnChunkWriter: var data []int32 switch leafArr.DataType().ID() { case arrow.INT32: @@ -310,7 +311,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array nulls := leafArr.NullBitmapBytes() wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) } - case *file.Int64ColumnWriter: + case *file.Int64ColumnChunkWriter: var data []int64 switch leafArr.DataType().ID() { case arrow.TIMESTAMP: @@ -372,7 +373,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array nulls := leafArr.NullBitmapBytes() wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) } - case *file.Int96ColumnWriter: + case *file.Int96ColumnChunkWriter: if leafArr.DataType().ID() != arrow.TIMESTAMP { return xerrors.New("unsupported arrow type to write to Int96 column") } @@ -390,7 +391,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array nulls := leafArr.NullBitmapBytes() wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, int64(leafArr.Data().Offset())) } - case *file.Float32ColumnWriter: + case *file.Float32ColumnChunkWriter: if leafArr.DataType().ID() != arrow.FLOAT32 { return xerrors.New("invalid column type to write to Float") } @@ -399,7 +400,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array } else { wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) } - case *file.Float64ColumnWriter: + case *file.Float64ColumnChunkWriter: if leafArr.DataType().ID() != arrow.FLOAT64 { return xerrors.New("invalid column type to write to Float") } @@ -408,26 +409,19 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array } else { wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) } - case *file.ByteArrayColumnWriter: + case *file.ByteArrayColumnChunkWriter: if leafArr.DataType().ID() != arrow.STRING && leafArr.DataType().ID() != arrow.BINARY { return xerrors.New("invalid column type to write to ByteArray") } var ( - offsets = leafArr.(binaryarr).ValueOffsets() - buffer = leafArr.Data().Buffers()[2] - valueBuf []byte + buffer = leafArr.(binaryarr).ValueBytes() + offsets = leafArr.(binaryarr).ValueOffsets() ) - if buffer == nil { - valueBuf = []byte{} - } else { - valueBuf = buffer.Bytes() - } - data := make([]parquet.ByteArray, leafArr.Len()) for i := range data { - data[i] = parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]]) + data[i] = parquet.ByteArray(buffer[offsets[i]:offsets[i+1]]) } if !maybeParentNulls && noNulls { wr.WriteBatch(data, defLevels, repLevels) @@ -435,7 +429,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array wr.WriteBatchSpaced(data, defLevels, repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset())) } - case *file.FixedLenByteArrayColumnWriter: + case *file.FixedLenByteArrayColumnChunkWriter: switch dt := leafArr.DataType().(type) { case *arrow.FixedSizeBinaryType: data := make([]parquet.FixedLenByteArray, leafArr.Len()) @@ -451,8 +445,8 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr array // parquet decimal are stored with FixedLength values where the length is // proportional to the precision. Arrow's Decimal are always stored with 16/32 // bytes. thus the internal FLBA must be adjusted by the offset calculation - offset := dt.BitWidth() - int(DecimalSize(dt.Precision)) - ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth()) + offset := arrow.Decimal128SizeBytes - int(DecimalSize(dt.Precision)) + ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * arrow.Decimal128SizeBytes) scratch := ctx.dataBuffer.Bytes() typeLen := wr.Descr().TypeLength() fixDecimalEndianness := func(in decimal128.Num) parquet.FixedLenByteArray { diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index be24497..84d8f22 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -162,7 +162,7 @@ func TestWriteArrowCols(t *testing.T) { ) switch expected.Schema().Field(i).Type.(arrow.FixedWidthDataType).BitWidth() { case 32: - colReader := rgr.Column(i).(*file.Int32ColumnReader) + colReader := rgr.Column(i).(*file.Int32ColumnChunkReader) vals := make([]int32, int(expected.NumRows())) total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil) require.NoError(t, err) @@ -182,7 +182,7 @@ func TestWriteArrowCols(t *testing.T) { } } case 64: - colReader := rgr.Column(i).(*file.Int64ColumnReader) + colReader := rgr.Column(i).(*file.Int64ColumnChunkReader) vals := make([]int64, int(expected.NumRows())) total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil) require.NoError(t, err) @@ -249,7 +249,7 @@ func TestWriteArrowInt96(t *testing.T) { tsRdr := rgr.Column(3) assert.Equal(t, parquet.Types.Int96, tsRdr.Type()) - rdr := tsRdr.(*file.Int96ColumnReader) + rdr := tsRdr.(*file.Int96ColumnChunkReader) vals := make([]parquet.Int96, expected.NumRows()) defLevels := make([]int16, int(expected.NumRows())) @@ -770,7 +770,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() { rgw := writer.AppendRowGroup() cw, _ := rgw.NextColumn() - cw.(*file.ByteArrayColumnWriter).WriteBatch(bigEndian, nil, nil) + cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil) cw.Close() rgw.Close() writer.Close() @@ -820,21 +820,21 @@ func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(data []byte, values a } var fullTypeList = []arrow.DataType{ - arrow.FixedWidthTypes.Boolean, - arrow.PrimitiveTypes.Uint8, - arrow.PrimitiveTypes.Int8, - arrow.PrimitiveTypes.Uint16, - arrow.PrimitiveTypes.Int16, - arrow.PrimitiveTypes.Uint32, - arrow.PrimitiveTypes.Int32, - arrow.PrimitiveTypes.Uint64, - arrow.PrimitiveTypes.Int64, - arrow.FixedWidthTypes.Date32, - arrow.PrimitiveTypes.Float32, - arrow.PrimitiveTypes.Float64, - arrow.BinaryTypes.String, - arrow.BinaryTypes.Binary, - &arrow.FixedSizeBinaryType{ByteWidth: 10}, + // arrow.FixedWidthTypes.Boolean, + // arrow.PrimitiveTypes.Uint8, + // arrow.PrimitiveTypes.Int8, + // arrow.PrimitiveTypes.Uint16, + // arrow.PrimitiveTypes.Int16, + // arrow.PrimitiveTypes.Uint32, + // arrow.PrimitiveTypes.Int32, + // arrow.PrimitiveTypes.Uint64, + // arrow.PrimitiveTypes.Int64, + // arrow.FixedWidthTypes.Date32, + // arrow.PrimitiveTypes.Float32, + // arrow.PrimitiveTypes.Float64, + // arrow.BinaryTypes.String, + // arrow.BinaryTypes.Binary, + // &arrow.FixedSizeBinaryType{ByteWidth: 10}, &arrow.Decimal128Type{Precision: 1, Scale: 0}, &arrow.Decimal128Type{Precision: 5, Scale: 4}, &arrow.Decimal128Type{Precision: 10, Scale: 9}, diff --git a/go/parquet/pqarrow/file_writer.go b/go/parquet/pqarrow/file_writer.go index 1f0a946..5109602 100644 --- a/go/parquet/pqarrow/file_writer.go +++ b/go/parquet/pqarrow/file_writer.go @@ -143,7 +143,7 @@ func (fw *FileWriter) WriteBuffered(rec array.Record) error { var ( recList []array.Record maxRows = fw.wr.Properties().MaxRowGroupLength() - curRows int64 + curRows int err error ) if fw.rgw != nil { @@ -154,12 +154,12 @@ func (fw *FileWriter) WriteBuffered(rec array.Record) error { fw.NewBufferedRowGroup() } - if curRows+rec.NumRows() <= maxRows { + if int64(curRows)+rec.NumRows() <= maxRows { recList = []array.Record{rec} } else { - recList = []array.Record{rec.NewSlice(0, maxRows-curRows)} + recList = []array.Record{rec.NewSlice(0, maxRows-int64(curRows))} defer recList[0].Release() - for offset := int64(maxRows - curRows); offset < rec.NumRows(); offset += maxRows { + for offset := int64(maxRows - int64(curRows)); offset < rec.NumRows(); offset += maxRows { s := rec.NewSlice(offset, offset+utils.Min(maxRows, rec.NumRows()-offset)) defer s.Release() recList = append(recList, s) diff --git a/go/parquet/pqarrow/path_builder.go b/go/parquet/pqarrow/path_builder.go index a90644c..cd7c577 100644 --- a/go/parquet/pqarrow/path_builder.go +++ b/go/parquet/pqarrow/path_builder.go @@ -427,7 +427,7 @@ func (p *pathBuilder) Visit(arr array.Interface) error { return nil case arrow.EXTENSION: return xerrors.New("extension types not implemented yet") - case arrow.UNION: + case arrow.SPARSE_UNION, arrow.DENSE_UNION: return xerrors.New("union types aren't supported in parquet") default: p.addTerminalInfo(arr) diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index 5c72a41..22810e6 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -913,7 +913,7 @@ func getNestedFactory(origin, inferred arrow.DataType) func(fieldList []arrow.Fi func applyOriginalStorageMetadata(origin arrow.Field, inferred *SchemaField) (modified bool, err error) { nchildren := len(inferred.Children) switch origin.Type.ID() { - case arrow.EXTENSION, arrow.UNION, arrow.DICTIONARY: + case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION, arrow.DICTIONARY: err = xerrors.New("unimplemented type") case arrow.STRUCT: typ := origin.Type.(*arrow.StructType)
