This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch temp-parquet-pqarrow
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 879ca48f1a7ad0da6015a09c32eab64545fa77e9
Author: Matthew Topol <[email protected]>
AuthorDate: Tue Nov 9 13:25:39 2021 -0500

    update to latest arrow changes, fix issues
---
 go/parquet/file/level_conversion.go     |  46 +-
 go/parquet/file/record_reader.go        | 799 ++++++++++++++++++++++++++++++++
 go/parquet/go.mod                       |   4 +-
 go/parquet/go.sum                       |  11 +-
 go/parquet/pqarrow/encode_arrow.go      |  42 +-
 go/parquet/pqarrow/encode_arrow_test.go |  38 +-
 go/parquet/pqarrow/file_writer.go       |   8 +-
 go/parquet/pqarrow/path_builder.go      |   2 +-
 go/parquet/pqarrow/schema.go            |   2 +-
 9 files changed, 882 insertions(+), 70 deletions(-)

diff --git a/go/parquet/file/level_conversion.go 
b/go/parquet/file/level_conversion.go
index 5d55848..381bdbc 100644
--- a/go/parquet/file/level_conversion.go
+++ b/go/parquet/file/level_conversion.go
@@ -19,6 +19,7 @@ package file
 import (
        "math"
        "math/bits"
+       "unsafe"
 
        "github.com/apache/arrow/go/parquet"
        "github.com/apache/arrow/go/parquet/internal/bmi"
@@ -136,28 +137,37 @@ type ValidityBitmapInputOutput struct {
 }
 
 // create a bitmap out of the definition Levels and return the number of 
non-null values
-func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info 
LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) uint64 {
-       definedBitmap := bmi.GreaterThanBitmap(defLevels, info.DefLevel-1)
+func defLevelsBatchToBitmap(defLevels []int16, remainingUpperBound int64, info 
LevelInfo, wr utils.BitmapWriter, hasRepeatedParent bool) (count uint64) {
+       const maxbatch = 8 * int(unsafe.Sizeof(uint64(0)))
 
-       if hasRepeatedParent {
-               // Greater than level_info.repeated_ancestor_def_level - 1 
implies >= the
-               // repeated_ancestor_def_level
-               presentBitmap := bmi.GreaterThanBitmap(defLevels, 
info.RepeatedAncestorDefLevel-1)
-               selectedBits := bmi.ExtractBits(definedBitmap, presentBitmap)
-               selectedCount := int64(bits.OnesCount64(presentBitmap))
-               if selectedCount > remainingUpperBound {
-                       panic("values read exceeded upper bound")
+       var batch []int16
+       for len(defLevels) > 0 {
+               batchSize := utils.MinInt(maxbatch, len(defLevels))
+               batch, defLevels = defLevels[:batchSize], defLevels[batchSize:]
+               definedBitmap := bmi.GreaterThanBitmap(batch, info.DefLevel-1)
+
+               if hasRepeatedParent {
+                       // Greater than level_info.repeated_ancestor_def_level 
- 1 implies >= the
+                       // repeated_ancestor_def_level
+                       presentBitmap := bmi.GreaterThanBitmap(batch, 
info.RepeatedAncestorDefLevel-1)
+                       selectedBits := bmi.ExtractBits(definedBitmap, 
presentBitmap)
+                       selectedCount := int64(bits.OnesCount64(presentBitmap))
+                       if selectedCount > remainingUpperBound {
+                               panic("values read exceeded upper bound")
+                       }
+                       wr.AppendWord(selectedBits, selectedCount)
+                       count += uint64(bits.OnesCount64(selectedBits))
+                       continue
                }
-               wr.AppendWord(selectedBits, selectedCount)
-               return uint64(bits.OnesCount64(selectedBits))
-       }
 
-       if int64(len(defLevels)) > remainingUpperBound {
-               panic("values read exceed upper bound")
-       }
+               if int64(len(defLevels)) > remainingUpperBound {
+                       panic("values read exceed upper bound")
+               }
 
-       wr.AppendWord(definedBitmap, int64(len(defLevels)))
-       return uint64(bits.OnesCount64(definedBitmap))
+               wr.AppendWord(definedBitmap, int64(len(batch)))
+               count += uint64(bits.OnesCount64(definedBitmap))
+       }
+       return
 }
 
 // create a bitmap out of the definition Levels
diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
new file mode 100644
index 0000000..8f92512
--- /dev/null
+++ b/go/parquet/file/record_reader.go
@@ -0,0 +1,799 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package file
+
+import (
+       "sync/atomic"
+       "unsafe"
+
+       "github.com/JohnCGriffin/overflow"
+       "github.com/apache/arrow/go/arrow"
+       "github.com/apache/arrow/go/arrow/array"
+       "github.com/apache/arrow/go/arrow/bitutil"
+       "github.com/apache/arrow/go/arrow/memory"
+       "github.com/apache/arrow/go/parquet"
+       "github.com/apache/arrow/go/parquet/internal/encoding"
+       "github.com/apache/arrow/go/parquet/internal/utils"
+       "github.com/apache/arrow/go/parquet/schema"
+       "golang.org/x/xerrors"
+)
+
+type RecordReader interface {
+       DefLevels() []int16
+       LevelsPos() int64
+       RepLevels() []int16
+       Reset()
+       Reserve(int64) error
+       HasMore() bool
+       ReadRecords(num int64) (int64, error)
+       ValuesWritten() int
+       ReleaseValidBits() *memory.Buffer
+       ReleaseValues() *memory.Buffer
+       NullCount() int64
+       Type() parquet.Type
+       Values() []byte
+       SetPageReader(PageReader)
+       Retain()
+       Release()
+}
+
+type BinaryRecordReader interface {
+       RecordReader
+       GetBuilderChunks() []array.Interface
+}
+
+type recordReaderImpl interface {
+       ColumnChunkReader
+       ReadValuesDense(int64) error
+       ReadValuesSpaced(int64, int64) error
+       ReserveValues(int64, bool) error
+       ResetValues()
+       GetValidBits() []byte
+       IncrementWritten(int64, int64)
+       ValuesWritten() int64
+       ReleaseValidBits() *memory.Buffer
+       ReleaseValues() *memory.Buffer
+       NullCount() int64
+       Values() []byte
+       SetPageReader(PageReader)
+       Retain()
+       Release()
+}
+
+type binaryRecordReaderImpl interface {
+       recordReaderImpl
+       GetBuilderChunks() []array.Interface
+}
+
+type primitiveRecordReader struct {
+       ColumnChunkReader
+
+       valuesWritten int64
+       valuesCap     int64
+       nullCount     int64
+       values        *memory.Buffer
+       validBits     *memory.Buffer
+       mem           memory.Allocator
+
+       refCount  int64
+       useValues bool
+}
+
+func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator) 
primitiveRecordReader {
+       return primitiveRecordReader{
+               ColumnChunkReader: NewColumnReader(descr, nil, mem),
+               values:            memory.NewResizableBuffer(mem),
+               validBits:         memory.NewResizableBuffer(mem),
+               mem:               mem,
+               refCount:          1,
+               useValues:         descr.PhysicalType() != 
parquet.Types.ByteArray && descr.PhysicalType() != 
parquet.Types.FixedLenByteArray,
+       }
+}
+
+func (pr *primitiveRecordReader) Retain() {
+       atomic.AddInt64(&pr.refCount, 1)
+}
+
+func (pr *primitiveRecordReader) Release() {
+       if atomic.AddInt64(&pr.refCount, -1) == 0 {
+               if pr.values != nil {
+                       pr.values.Release()
+                       pr.values = nil
+               }
+               if pr.validBits != nil {
+                       pr.validBits.Release()
+                       pr.validBits = nil
+               }
+       }
+}
+
+func (pr *primitiveRecordReader) SetPageReader(rdr PageReader) {
+       pr.ColumnChunkReader.setPageReader(rdr)
+}
+
+func (pr *primitiveRecordReader) ReleaseValidBits() *memory.Buffer {
+       res := pr.validBits
+       res.Resize(int(bitutil.BytesForBits(pr.valuesWritten)))
+       pr.validBits = memory.NewResizableBuffer(pr.mem)
+       return res
+}
+
+func (pr *primitiveRecordReader) ReleaseValues() (res *memory.Buffer) {
+       res = pr.values
+       nbytes, err := pr.numBytesForValues(pr.valuesWritten)
+       if err != nil {
+               panic(err)
+       }
+       res.Resize(int(nbytes))
+       pr.values = memory.NewResizableBuffer(pr.mem)
+       pr.valuesCap = 0
+
+       return
+}
+
+func (pr *primitiveRecordReader) NullCount() int64 { return pr.nullCount }
+
+func (pr *primitiveRecordReader) IncrementWritten(w, n int64) {
+       pr.valuesWritten += w
+       pr.nullCount += n
+}
+func (pr *primitiveRecordReader) GetValidBits() []byte { return 
pr.validBits.Bytes() }
+func (pr *primitiveRecordReader) ValuesWritten() int64 { return 
pr.valuesWritten }
+func (pr *primitiveRecordReader) Values() []byte       { return 
pr.values.Bytes() }
+func (pr *primitiveRecordReader) ResetValues() {
+       if pr.valuesWritten > 0 {
+               pr.values.ResizeNoShrink(0)
+               pr.validBits.ResizeNoShrink(0)
+               pr.valuesWritten = 0
+               pr.valuesCap = 0
+               pr.nullCount = 0
+       }
+}
+
+func (pr *primitiveRecordReader) numBytesForValues(nitems int64) (num int64, 
err error) {
+       typeSize := int64(pr.Descriptor().PhysicalType().ByteSize())
+       var ok bool
+       if num, ok = overflow.Mul64(nitems, typeSize); !ok {
+               err = xerrors.New("total size of items too large")
+       }
+       return
+}
+
+func (pr *primitiveRecordReader) ReserveValues(extra int64, hasNullable bool) 
error {
+       newCap, err := updateCapacity(pr.valuesCap, pr.valuesWritten, extra)
+       if err != nil {
+               return err
+       }
+       if newCap > pr.valuesCap {
+               capBytes, err := pr.numBytesForValues(newCap)
+               if err != nil {
+                       return err
+               }
+               if pr.useValues {
+                       pr.values.ResizeNoShrink(int(capBytes))
+               }
+               pr.valuesCap = newCap
+       }
+       if hasNullable {
+               validBytesCap := bitutil.BytesForBits(pr.valuesCap)
+               if pr.validBits.Len() < int(validBytesCap) {
+                       pr.validBits.ResizeNoShrink(int(validBytesCap))
+               }
+       }
+       return nil
+}
+
+func (pr *primitiveRecordReader) ReadValuesDense(toRead int64) (err error) {
+       switch cr := pr.ColumnChunkReader.(type) {
+       case *BooleanColumnChunkReader:
+               data := pr.values.Bytes()[int(pr.valuesWritten):]
+               values := *(*[]bool)(unsafe.Pointer(&data))
+               _, err = 
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[:toRead])
+       case *Int32ColumnChunkReader:
+               values := 
arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int32Decoder).Decode(values[:toRead])
+       case *Int64ColumnChunkReader:
+               values := 
arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int64Decoder).Decode(values[:toRead])
+       case *Int96ColumnChunkReader:
+               values := 
parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int96Decoder).Decode(values[:toRead])
+       case *ByteArrayColumnChunkReader:
+               values := 
parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[:toRead])
+       case *FixedLenByteArrayColumnChunkReader:
+               values := 
parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[:toRead])
+       case *Float32ColumnChunkReader:
+               values := 
arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Float32Decoder).Decode(values[:toRead])
+       case *Float64ColumnChunkReader:
+               values := 
arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Float64Decoder).Decode(values[:toRead])
+       default:
+               panic("invalid type for record reader")
+       }
+       return
+}
+
+func (pr *primitiveRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount 
int64) (err error) {
+       validBits := pr.validBits.Bytes()
+       offset := pr.valuesWritten
+
+       switch cr := pr.ColumnChunkReader.(type) {
+       case *BooleanColumnChunkReader:
+               data := pr.values.Bytes()[int(pr.valuesWritten):]
+               values := *(*[]bool)(unsafe.Pointer(&data))
+               _, err = 
cr.curDecoder.(encoding.BooleanDecoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *Int32ColumnChunkReader:
+               values := 
arrow.Int32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int32Decoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *Int64ColumnChunkReader:
+               values := 
arrow.Int64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int64Decoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *Int96ColumnChunkReader:
+               values := 
parquet.Int96Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Int96Decoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *ByteArrayColumnChunkReader:
+               values := 
parquet.ByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.ByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *FixedLenByteArrayColumnChunkReader:
+               values := 
parquet.FixedLenByteArrayTraits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *Float32ColumnChunkReader:
+               values := 
arrow.Float32Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Float32Decoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       case *Float64ColumnChunkReader:
+               values := 
arrow.Float64Traits.CastFromBytes(pr.values.Bytes())[int(pr.valuesWritten):]
+               _, err = 
cr.curDecoder.(encoding.Float64Decoder).DecodeSpaced(values[:int(valuesWithNulls)],
 int(nullCount), validBits, offset)
+       default:
+               panic("invalid type for record reader")
+       }
+       return
+}
+
+type recordReader struct {
+       recordReaderImpl
+       leafInfo LevelInfo
+
+       nullable    bool
+       atRecStart  bool
+       recordsRead int64
+
+       levelsWritten int64
+       levelsPos     int64
+       levelsCap     int64
+
+       defLevels *memory.Buffer
+       repLevels *memory.Buffer
+
+       readDict bool
+}
+
+type binaryRecordReader struct {
+       *recordReader
+}
+
+func (b *binaryRecordReader) GetBuilderChunks() []array.Interface {
+       return b.recordReaderImpl.(binaryRecordReaderImpl).GetBuilderChunks()
+}
+
+func newRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+       if mem == nil {
+               mem = memory.DefaultAllocator
+       }
+
+       pr := createPrimitiveRecordReader(descr, mem)
+       return &recordReader{
+               recordReaderImpl: &pr,
+               leafInfo:         info,
+               defLevels:        memory.NewResizableBuffer(mem),
+               repLevels:        memory.NewResizableBuffer(mem),
+       }
+}
+
+func (rr *recordReader) DefLevels() []int16 {
+       return arrow.Int16Traits.CastFromBytes(rr.defLevels.Bytes())
+}
+
+func (rr *recordReader) RepLevels() []int16 {
+       return arrow.Int16Traits.CastFromBytes(rr.repLevels.Bytes())
+}
+
+func (rr *recordReader) HasMore() bool {
+       return rr.pager() != nil
+}
+
+func (rr *recordReader) SetPageReader(pr PageReader) {
+       rr.atRecStart = true
+       rr.recordReaderImpl.SetPageReader(pr)
+}
+
+func (rr *recordReader) ValuesWritten() int {
+       return int(rr.recordReaderImpl.ValuesWritten())
+}
+
+func (rr *recordReader) LevelsPos() int64 { return rr.levelsPos }
+
+func updateCapacity(cap, size, extra int64) (int64, error) {
+       if extra < 0 {
+               return 0, xerrors.New("negative size (corrupt file?)")
+       }
+       target, ok := overflow.Add64(size, extra)
+       if !ok {
+               return 0, xerrors.New("allocation size too large (corrupt 
file?)")
+       }
+       if target >= (1 << 62) {
+               return 0, xerrors.New("allocation size too large (corrupt 
file?)")
+       }
+       if cap >= target {
+               return cap, nil
+       }
+       return int64(bitutil.NextPowerOf2(int(target))), nil
+}
+
+func (rr *recordReader) Reserve(cap int64) error {
+       if err := rr.reserveLevels(cap); err != nil {
+               return err
+       }
+       if err := rr.reserveValues(cap); err != nil {
+               return err
+       }
+       return nil
+}
+
+func (rr *recordReader) reserveLevels(extra int64) error {
+       if rr.Descriptor().MaxDefinitionLevel() > 0 {
+               newCap, err := updateCapacity(rr.levelsCap, rr.levelsWritten, 
extra)
+               if err != nil {
+                       return err
+               }
+
+               if newCap > rr.levelsCap {
+                       capBytes, ok := overflow.Mul(int(newCap), 
arrow.Int16SizeBytes)
+                       if !ok {
+                               return xerrors.Errorf("allocation size too 
large (corrupt file?)")
+                       }
+                       rr.defLevels.ResizeNoShrink(capBytes)
+                       if rr.Descriptor().MaxRepetitionLevel() > 0 {
+                               rr.repLevels.ResizeNoShrink(capBytes)
+                       }
+                       rr.levelsCap = newCap
+               }
+       }
+       return nil
+}
+
+func (rr *recordReader) reserveValues(extra int64) error {
+       return rr.recordReaderImpl.ReserveValues(extra, 
rr.leafInfo.HasNullableValues())
+}
+
+func (rr *recordReader) resetValues() {
+       rr.recordReaderImpl.ResetValues()
+}
+
+func (rr *recordReader) Reset() {
+       rr.resetValues()
+
+       if rr.levelsWritten > 0 {
+               remain := int(rr.levelsWritten - rr.levelsPos)
+               // shift remaining levels to beginning of buffer and trim only 
the
+               // number decoded remaining
+               defData := rr.DefLevels()
+
+               copy(defData, defData[int(rr.levelsPos):int(rr.levelsWritten)])
+               rr.defLevels.ResizeNoShrink(remain * int(arrow.Int16SizeBytes))
+
+               if rr.Descriptor().MaxRepetitionLevel() > 0 {
+                       repData := rr.RepLevels()
+                       copy(repData, 
repData[int(rr.levelsPos):int(rr.levelsWritten)])
+                       rr.repLevels.ResizeNoShrink(remain * 
int(arrow.Int16SizeBytes))
+               }
+
+               rr.levelsWritten -= rr.levelsPos
+               rr.levelsPos = 0
+               rr.levelsCap = int64(remain)
+       }
+
+       rr.recordsRead = 0
+}
+
+// process written rep/def levels to read the end of records
+// process no more levels than necessary to delimit the indicated
+// number of logical records. updates internal state of recordreader
+// returns number of records delimited
+func (rr *recordReader) delimitRecords(numRecords int64) (recordsRead, 
valsToRead int64) {
+       var (
+               curRep int16
+               curDef int16
+       )
+
+       defLevels := rr.DefLevels()[int(rr.levelsPos):]
+       repLevels := rr.RepLevels()[int(rr.levelsPos):]
+
+       for rr.levelsPos < rr.levelsWritten {
+               curRep, repLevels = repLevels[0], repLevels[1:]
+               if curRep == 0 {
+                       // if at record start, we are seeing the start of a 
record
+                       // for the second time, such as after repeated calls to 
delimitrecords.
+                       // in this case we must continue until we find another 
record start
+                       // or exaust the column chunk
+                       if !rr.atRecStart {
+                               // end of a record, increment count
+                               recordsRead++
+                               if recordsRead == numRecords {
+                                       // found the number of records we 
wanted, set record start to true and break
+                                       rr.atRecStart = true
+                                       break
+                               }
+                       }
+               }
+               // we have decided to consume the level at this position
+               // advance until we find another boundary
+               rr.atRecStart = false
+
+               curDef, defLevels = defLevels[0], defLevels[1:]
+               if curDef == rr.Descriptor().MaxDefinitionLevel() {
+                       valsToRead++
+               }
+               rr.levelsPos++
+       }
+       return
+}
+
+func (rr *recordReader) ReadRecordData(numRecords int64) (int64, error) {
+       possibleNum := utils.Max(numRecords, rr.levelsWritten-rr.levelsPos)
+       if err := rr.reserveValues(possibleNum); err != nil {
+               return 0, err
+       }
+
+       var (
+               startPos     = rr.levelsPos
+               valuesToRead int64
+               recordsRead  int64
+               nullCount    int64
+               err          error
+       )
+
+       if rr.Descriptor().MaxRepetitionLevel() > 0 {
+               recordsRead, valuesToRead = rr.delimitRecords(numRecords)
+       } else if rr.Descriptor().MaxDefinitionLevel() > 0 {
+               // no repetition levels, skip delimiting logic. each level
+               // represents null or not null entry
+               recordsRead = utils.Min(rr.levelsWritten-rr.levelsPos, 
numRecords)
+               // this is advanced by delimitRecords which we skipped
+               rr.levelsPos += recordsRead
+       } else {
+               recordsRead, valuesToRead = numRecords, numRecords
+       }
+
+       if rr.leafInfo.HasNullableValues() {
+               validityIO := ValidityBitmapInputOutput{
+                       ReadUpperBound:  rr.levelsPos - startPos,
+                       ValidBits:       rr.GetValidBits(),
+                       ValidBitsOffset: rr.recordReaderImpl.ValuesWritten(),
+               }
+               DefLevelsToBitmap(rr.DefLevels()[startPos:int(rr.levelsPos)], 
rr.leafInfo, &validityIO)
+               valuesToRead = validityIO.Read - validityIO.NullCount
+               nullCount = validityIO.NullCount
+               err = rr.ReadValuesSpaced(validityIO.Read, nullCount)
+       } else {
+               err = rr.ReadValuesDense(valuesToRead)
+       }
+       if err != nil {
+               return 0, err
+       }
+
+       if rr.leafInfo.DefLevel > 0 {
+               rr.consumeBufferedValues(rr.levelsPos - startPos)
+       } else {
+               rr.consumeBufferedValues(valuesToRead)
+       }
+
+       // total values, including nullspaces if any
+       rr.IncrementWritten(valuesToRead+nullCount, nullCount)
+       return recordsRead, nil
+}
+
+const minLevelBatchSize = 1024
+
+func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) {
+       // delimit records, then read values at the end
+       recordsRead := int64(0)
+
+       if rr.levelsPos < rr.levelsWritten {
+               additional, err := rr.ReadRecordData(numRecords)
+               if err != nil {
+                       return 0, err
+               }
+               recordsRead += additional
+       }
+
+       levelBatch := utils.Max(minLevelBatchSize, numRecords)
+
+       // if we are in the middle of a record, continue until reaching
+       // the desired number of records or the end of the current record
+       // if we have enough
+       for !rr.atRecStart || recordsRead < numRecords {
+               // is there more data in this row group?
+               if !rr.HasNext() {
+                       if !rr.atRecStart {
+                               // ended the row group while inside a record we 
haven't seen
+                               // the end of yet. increment the record count 
for the last record
+                               // in the row group
+                               recordsRead++
+                               rr.atRecStart = true
+                       }
+                       break
+               }
+
+               // we perform multiple batch reads until we either exhaust the 
row group
+               // or observe the desired number of records
+               batchSize := utils.Min(levelBatch, rr.numAvailValues())
+               if batchSize == 0 {
+                       // no more data in column
+                       break
+               }
+
+               if rr.Descriptor().MaxDefinitionLevel() > 0 {
+                       if err := rr.reserveLevels(batchSize); err != nil {
+                               return 0, err
+                       }
+
+                       defLevels := rr.DefLevels()[int(rr.levelsWritten):]
+
+                       levelsRead := 0
+                       // not present for non-repeated fields
+                       if rr.Descriptor().MaxRepetitionLevel() > 0 {
+                               repLevels := 
rr.RepLevels()[int(rr.levelsWritten):]
+                               levelsRead, _ = 
rr.readDefinitionLevels(defLevels[:batchSize])
+                               if 
rr.readRepetitionLevels(repLevels[:batchSize]) != levelsRead {
+                                       return 0, xerrors.New("number of 
decoded rep/def levels did not match")
+                               }
+                       } else if rr.Descriptor().MaxDefinitionLevel() > 0 {
+                               levelsRead, _ = 
rr.readDefinitionLevels(defLevels[:batchSize])
+                       }
+
+                       if levelsRead == 0 {
+                               // exhausted column chunk
+                               break
+                       }
+
+                       rr.levelsWritten += int64(levelsRead)
+                       read, err := rr.ReadRecordData(numRecords - recordsRead)
+                       if err != nil {
+                               return recordsRead, err
+                       }
+                       recordsRead += read
+               } else {
+                       // no rep or def levels
+                       batchSize = utils.Min(numRecords-recordsRead, batchSize)
+                       read, err := rr.ReadRecordData(batchSize)
+                       if err != nil {
+                               return recordsRead, err
+                       }
+                       recordsRead += read
+               }
+       }
+
+       return recordsRead, nil
+}
+
+func (rr *recordReader) ReleaseValidBits() *memory.Buffer {
+       if rr.leafInfo.HasNullableValues() {
+               return rr.recordReaderImpl.ReleaseValidBits()
+       }
+       return nil
+}
+
+type flbaRecordReader struct {
+       primitiveRecordReader
+
+       bldr     *array.FixedSizeBinaryBuilder
+       valueBuf []parquet.FixedLenByteArray
+}
+
+func (fr *flbaRecordReader) ReserveValues(extra int64, hasNullable bool) error 
{
+       fr.bldr.Reserve(int(extra))
+       return fr.primitiveRecordReader.ReserveValues(extra, hasNullable)
+}
+
+func (fr *flbaRecordReader) Retain() {
+       fr.bldr.Retain()
+       fr.primitiveRecordReader.Retain()
+}
+
+func (fr *flbaRecordReader) Release() {
+       fr.bldr.Release()
+       fr.primitiveRecordReader.Release()
+}
+
+func (fr *flbaRecordReader) ReadValuesDense(toRead int64) error {
+       if int64(cap(fr.valueBuf)) < toRead {
+               fr.valueBuf = make([]parquet.FixedLenByteArray, 0, toRead)
+       }
+
+       values := fr.valueBuf[:toRead]
+       dec := 
fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder)
+
+       _, err := dec.Decode(values)
+       if err != nil {
+               return err
+       }
+
+       for _, val := range values {
+               fr.bldr.Append(val)
+       }
+       fr.ResetValues()
+       return nil
+}
+
+func (fr *flbaRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount int64) 
error {
+       validBits := fr.validBits.Bytes()
+       offset := fr.valuesWritten
+
+       if int64(cap(fr.valueBuf)) < valuesWithNulls {
+               fr.valueBuf = make([]parquet.FixedLenByteArray, 0, 
valuesWithNulls)
+       }
+
+       values := fr.valueBuf[:valuesWithNulls]
+       dec := 
fr.ColumnChunkReader.(*FixedLenByteArrayColumnChunkReader).curDecoder.(encoding.FixedLenByteArrayDecoder)
+       _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset)
+       if err != nil {
+               return err
+       }
+
+       for idx, val := range values {
+               if bitutil.BitIsSet(validBits, int(offset)+idx) {
+                       fr.bldr.Append(val)
+               } else {
+                       fr.bldr.AppendNull()
+               }
+       }
+       fr.ResetValues()
+       return nil
+}
+
+func (fr *flbaRecordReader) GetBuilderChunks() []array.Interface {
+       return []array.Interface{fr.bldr.NewArray()}
+}
+
+func newFLBARecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+       if mem == nil {
+               mem = memory.DefaultAllocator
+       }
+
+       byteWidth := descr.TypeLength()
+
+       return &binaryRecordReader{&recordReader{
+               recordReaderImpl: &flbaRecordReader{
+                       createPrimitiveRecordReader(descr, mem),
+                       array.NewFixedSizeBinaryBuilder(mem, 
&arrow.FixedSizeBinaryType{ByteWidth: byteWidth}),
+                       nil,
+               },
+               leafInfo:  info,
+               defLevels: memory.NewResizableBuffer(mem),
+               repLevels: memory.NewResizableBuffer(mem),
+       }}
+}
+
+type byteArrayRecordReader struct {
+       primitiveRecordReader
+
+       bldr     *array.BinaryBuilder
+       valueBuf []parquet.ByteArray
+}
+
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator) RecordReader {
+       if mem == nil {
+               mem = memory.DefaultAllocator
+       }
+
+       dt := arrow.BinaryTypes.Binary
+       if descr.LogicalType().Equals(schema.StringLogicalType{}) {
+               dt = arrow.BinaryTypes.String
+       }
+
+       return &binaryRecordReader{&recordReader{
+               recordReaderImpl: &byteArrayRecordReader{
+                       createPrimitiveRecordReader(descr, mem),
+                       array.NewBinaryBuilder(mem, dt),
+                       nil,
+               },
+               leafInfo:  info,
+               defLevels: memory.NewResizableBuffer(mem),
+               repLevels: memory.NewResizableBuffer(mem),
+       }}
+}
+
+func (fr *byteArrayRecordReader) ReserveValues(extra int64, hasNullable bool) 
error {
+       fr.bldr.Reserve(int(extra))
+       return fr.primitiveRecordReader.ReserveValues(extra, hasNullable)
+}
+
+func (fr *byteArrayRecordReader) Retain() {
+       fr.bldr.Retain()
+       fr.primitiveRecordReader.Retain()
+}
+
+func (fr *byteArrayRecordReader) Release() {
+       fr.bldr.Release()
+       fr.primitiveRecordReader.Release()
+}
+
+func (br *byteArrayRecordReader) ReadValuesDense(toRead int64) error {
+       if int64(cap(br.valueBuf)) < toRead {
+               br.valueBuf = make([]parquet.ByteArray, 0, toRead)
+       }
+
+       values := br.valueBuf[:toRead]
+       dec := 
br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
+
+       _, err := dec.Decode(values)
+       if err != nil {
+               return err
+       }
+
+       for _, val := range values {
+               br.bldr.Append(val)
+       }
+       br.ResetValues()
+       return nil
+}
+
+func (br *byteArrayRecordReader) ReadValuesSpaced(valuesWithNulls, nullCount 
int64) error {
+       validBits := br.validBits.Bytes()
+       offset := br.valuesWritten
+
+       if int64(cap(br.valueBuf)) < valuesWithNulls {
+               br.valueBuf = make([]parquet.ByteArray, 0, valuesWithNulls)
+       }
+
+       values := br.valueBuf[:valuesWithNulls]
+       dec := 
br.ColumnChunkReader.(*ByteArrayColumnChunkReader).curDecoder.(encoding.ByteArrayDecoder)
+       _, err := dec.DecodeSpaced(values, int(nullCount), validBits, offset)
+       if err != nil {
+               return err
+       }
+
+       for idx, val := range values {
+               if bitutil.BitIsSet(validBits, int(offset)+idx) {
+                       br.bldr.Append(val)
+               } else {
+                       br.bldr.AppendNull()
+               }
+       }
+       br.ResetValues()
+       return nil
+}
+
+func (br *byteArrayRecordReader) GetBuilderChunks() []array.Interface {
+       return []array.Interface{br.bldr.NewArray()}
+}
+
+// TODO(mtopol): create optimized readers for dictionary types after 
ARROW-7286 is done
+
+func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem 
memory.Allocator) RecordReader {
+       switch descr.PhysicalType() {
+       case parquet.Types.ByteArray:
+               return newByteArrayRecordReader(descr, info, mem)
+       case parquet.Types.FixedLenByteArray:
+               return newFLBARecordReader(descr, info, mem)
+       default:
+               return newRecordReader(descr, info, mem)
+       }
+}
diff --git a/go/parquet/go.mod b/go/parquet/go.mod
index d1e28dd..25f3f64 100644
--- a/go/parquet/go.mod
+++ b/go/parquet/go.mod
@@ -21,7 +21,7 @@ go 1.15
 require (
        github.com/JohnCGriffin/overflow v0.0.0-20170615021017-4d914c927216
        github.com/andybalholm/brotli v1.0.3
-       github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb
+       github.com/apache/arrow/go/arrow v0.0.0-20211109162857-ed8c76e74346
        github.com/apache/thrift v0.15.0
        github.com/golang/snappy v0.0.3
        github.com/klauspost/asmfmt v1.2.3
@@ -36,3 +36,5 @@ require (
        golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
        gonum.org/v1/gonum v0.9.3
 )
+
+replace github.com/apache/arrow/go/arrow => 
github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7
diff --git a/go/parquet/go.sum b/go/parquet/go.sum
index 1cdcc47..68a9fd2 100644
--- a/go/parquet/go.sum
+++ b/go/parquet/go.sum
@@ -11,8 +11,6 @@ github.com/ajstarks/svgo 
v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3
 github.com/andybalholm/brotli v1.0.3 
h1:fpcw+r1N1h0Poc1F/pHbW40cUm/lMEQslZtCkBQ0UnM=
 github.com/andybalholm/brotli v1.0.3/go.mod 
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
 github.com/antihax/optional v1.0.0/go.mod 
h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
-github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb 
h1:4t4siO1kRtmrdKcOKXxZvtFpCP/bJQW7LA3qABUhdEY=
-github.com/apache/arrow/go/arrow v0.0.0-20211025125312-be665ef948cb/go.mod 
h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
 github.com/apache/thrift v0.15.0 
h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y=
 github.com/apache/thrift v0.15.0/go.mod 
h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
 github.com/boombuler/barcode v1.0.0/go.mod 
h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
@@ -57,6 +55,7 @@ github.com/golang/protobuf v1.4.1/go.mod 
h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
 github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.4.3/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
+github.com/golang/protobuf v1.5.2 
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
 github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
 github.com/golang/snappy v0.0.3/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@@ -68,6 +67,7 @@ github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
 github.com/google/go-cmp v0.5.6/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
 github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod 
h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
@@ -101,6 +101,8 @@ github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
 github.com/yuin/goldmark v1.3.5/go.mod 
h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 github.com/zeebo/xxh3 v0.10.0 h1:1+2Mov9zfxTNUeoDG9k9i13VfxTR0p1JQu8L0vikxB0=
 github.com/zeebo/xxh3 v0.10.0/go.mod 
h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg=
+github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7 
h1:JjJo15d72a2bpc254UpkztI5X62iDEmdfjECOd5jCdQ=
+github.com/zeroshade/arrow/go/arrow v0.0.0-20211109182246-f26a0a7340d7/go.mod 
h1:Q7yQnSMnLvcXlZ8RV+jwz/6y1rQTqbX6C82SndT52Zs=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@@ -147,6 +149,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod 
h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod 
h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
+golang.org/x/net v0.0.0-20210614182718-04defd469f4e 
h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q=
 golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod 
h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -173,6 +176,7 @@ golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod 
h1:bj7SfCRtBDWHUb9sn
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
+golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -208,6 +212,7 @@ google.golang.org/genproto 
v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
 google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod 
h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod 
h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod 
h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
+google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79 
h1:s1jFTXJryg4a1mew7xv03VZD8N9XjxFhk1o4Js4WvPQ=
 google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod 
h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
 google.golang.org/grpc v1.19.0/go.mod 
h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
 google.golang.org/grpc v1.23.0/go.mod 
h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
@@ -216,6 +221,7 @@ google.golang.org/grpc v1.27.0/go.mod 
h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
 google.golang.org/grpc v1.33.1/go.mod 
h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
 google.golang.org/grpc v1.36.0/go.mod 
h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
 google.golang.org/grpc v1.38.0/go.mod 
h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
+google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI=
 google.golang.org/grpc v1.39.0/go.mod 
h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -228,6 +234,7 @@ google.golang.org/protobuf 
v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
 google.golang.org/protobuf v1.25.0/go.mod 
h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
 google.golang.org/protobuf v1.26.0-rc.1/go.mod 
h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
 google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
+google.golang.org/protobuf v1.27.1 
h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ=
 google.golang.org/protobuf v1.27.1/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 
h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/go/parquet/pqarrow/encode_arrow.go 
b/go/parquet/pqarrow/encode_arrow.go
index 65bee37..5d11718 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -35,7 +35,7 @@ import (
 
 func calcLeafCount(dt arrow.DataType) int {
        switch dt.ID() {
-       case arrow.EXTENSION, arrow.UNION:
+       case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION:
                panic("arrow type not implemented")
        case arrow.LIST:
                return calcLeafCount(dt.(*arrow.ListType).Elem())
@@ -158,7 +158,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context) 
error {
        arrCtx := arrowCtxFromContext(ctx)
        for leafIdx := 0; leafIdx < acw.leafCount; leafIdx++ {
                var (
-                       cw  file.ColumnWriter
+                       cw  file.ColumnChunkWriter
                        err error
                )
 
@@ -203,7 +203,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context) 
error {
 // leafArr is always a primitive (possibly dictionary encoded type).
 // Leaf_field_nullable indicates whether the leaf array is considered nullable
 // according to its schema in a Table or its parent array.
-func WriteArrowToColumn(ctx context.Context, cw file.ColumnWriter, leafArr 
array.Interface, defLevels, repLevels []int16, leafFieldNullable bool) error {
+func WriteArrowToColumn(ctx context.Context, cw file.ColumnChunkWriter, 
leafArr array.Interface, defLevels, repLevels []int16, leafFieldNullable bool) 
error {
        // Leaf nulls are canonical when there is only a single null element 
after a list
        // and it is at the leaf.
        colLevelInfo := cw.LevelInfo()
@@ -223,10 +223,11 @@ func WriteArrowToColumn(ctx context.Context, cw 
file.ColumnWriter, leafArr array
 }
 
 type binaryarr interface {
+       ValueBytes() []byte
        ValueOffsets() []int32
 }
 
-func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnWriter, leafArr 
array.Interface, defLevels, repLevels []int16, maybeParentNulls bool) (err 
error) {
+func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, 
leafArr array.Interface, defLevels, repLevels []int16, maybeParentNulls bool) 
(err error) {
        noNulls := cw.Descr().SchemaNode().RepetitionType() == 
parquet.Repetitions.Required || leafArr.NullN() == 0
 
        if ctx.dataBuffer == nil {
@@ -234,7 +235,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
        }
 
        switch wr := cw.(type) {
-       case *file.BooleanColumnWriter:
+       case *file.BooleanColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.BOOL {
                        return xerrors.Errorf("type mismatch, column is %s, 
array is %s", cw.Type(), leafArr.DataType().ID())
                }
@@ -255,7 +256,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                } else {
                        wr.WriteBatchSpaced(data, defLevels, repLevels, 
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
-       case *file.Int32ColumnWriter:
+       case *file.Int32ColumnChunkWriter:
                var data []int32
                switch leafArr.DataType().ID() {
                case arrow.INT32:
@@ -310,7 +311,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
                }
-       case *file.Int64ColumnWriter:
+       case *file.Int64ColumnChunkWriter:
                var data []int64
                switch leafArr.DataType().ID() {
                case arrow.TIMESTAMP:
@@ -372,7 +373,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
                }
-       case *file.Int96ColumnWriter:
+       case *file.Int96ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.TIMESTAMP {
                        return xerrors.New("unsupported arrow type to write to 
Int96 column")
                }
@@ -390,7 +391,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
                }
-       case *file.Float32ColumnWriter:
+       case *file.Float32ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.FLOAT32 {
                        return xerrors.New("invalid column type to write to 
Float")
                }
@@ -399,7 +400,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                } else {
                        
wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, 
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
-       case *file.Float64ColumnWriter:
+       case *file.Float64ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.FLOAT64 {
                        return xerrors.New("invalid column type to write to 
Float")
                }
@@ -408,26 +409,19 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                } else {
                        
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, 
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
-       case *file.ByteArrayColumnWriter:
+       case *file.ByteArrayColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.STRING && 
leafArr.DataType().ID() != arrow.BINARY {
                        return xerrors.New("invalid column type to write to 
ByteArray")
                }
 
                var (
-                       offsets  = leafArr.(binaryarr).ValueOffsets()
-                       buffer   = leafArr.Data().Buffers()[2]
-                       valueBuf []byte
+                       buffer  = leafArr.(binaryarr).ValueBytes()
+                       offsets = leafArr.(binaryarr).ValueOffsets()
                )
 
-               if buffer == nil {
-                       valueBuf = []byte{}
-               } else {
-                       valueBuf = buffer.Bytes()
-               }
-
                data := make([]parquet.ByteArray, leafArr.Len())
                for i := range data {
-                       data[i] = 
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+                       data[i] = 
parquet.ByteArray(buffer[offsets[i]:offsets[i+1]])
                }
                if !maybeParentNulls && noNulls {
                        wr.WriteBatch(data, defLevels, repLevels)
@@ -435,7 +429,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                        wr.WriteBatchSpaced(data, defLevels, repLevels, 
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
 
-       case *file.FixedLenByteArrayColumnWriter:
+       case *file.FixedLenByteArrayColumnChunkWriter:
                switch dt := leafArr.DataType().(type) {
                case *arrow.FixedSizeBinaryType:
                        data := make([]parquet.FixedLenByteArray, leafArr.Len())
@@ -451,8 +445,8 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnWriter, leafArr array
                        // parquet decimal are stored with FixedLength values 
where the length is
                        // proportional to the precision. Arrow's Decimal are 
always stored with 16/32
                        // bytes. thus the internal FLBA must be adjusted by 
the offset calculation
-                       offset := dt.BitWidth() - int(DecimalSize(dt.Precision))
-                       ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - 
leafArr.NullN()) * dt.BitWidth())
+                       offset := arrow.Decimal128SizeBytes - 
int(DecimalSize(dt.Precision))
+                       ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - 
leafArr.NullN()) * arrow.Decimal128SizeBytes)
                        scratch := ctx.dataBuffer.Bytes()
                        typeLen := wr.Descr().TypeLength()
                        fixDecimalEndianness := func(in decimal128.Num) 
parquet.FixedLenByteArray {
diff --git a/go/parquet/pqarrow/encode_arrow_test.go 
b/go/parquet/pqarrow/encode_arrow_test.go
index be24497..84d8f22 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -162,7 +162,7 @@ func TestWriteArrowCols(t *testing.T) {
                )
                switch 
expected.Schema().Field(i).Type.(arrow.FixedWidthDataType).BitWidth() {
                case 32:
-                       colReader := rgr.Column(i).(*file.Int32ColumnReader)
+                       colReader := 
rgr.Column(i).(*file.Int32ColumnChunkReader)
                        vals := make([]int32, int(expected.NumRows()))
                        total, read, err = 
colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
                        require.NoError(t, err)
@@ -182,7 +182,7 @@ func TestWriteArrowCols(t *testing.T) {
                                }
                        }
                case 64:
-                       colReader := rgr.Column(i).(*file.Int64ColumnReader)
+                       colReader := 
rgr.Column(i).(*file.Int64ColumnChunkReader)
                        vals := make([]int64, int(expected.NumRows()))
                        total, read, err = 
colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil)
                        require.NoError(t, err)
@@ -249,7 +249,7 @@ func TestWriteArrowInt96(t *testing.T) {
        tsRdr := rgr.Column(3)
        assert.Equal(t, parquet.Types.Int96, tsRdr.Type())
 
-       rdr := tsRdr.(*file.Int96ColumnReader)
+       rdr := tsRdr.(*file.Int96ColumnChunkReader)
        vals := make([]parquet.Int96, expected.NumRows())
        defLevels := make([]int16, int(expected.NumRows()))
 
@@ -770,7 +770,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
 
        rgw := writer.AppendRowGroup()
        cw, _ := rgw.NextColumn()
-       cw.(*file.ByteArrayColumnWriter).WriteBatch(bigEndian, nil, nil)
+       cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
        cw.Close()
        rgw.Close()
        writer.Close()
@@ -820,21 +820,21 @@ func (ps *ParquetIOTestSuite) 
readAndCheckSingleColumnFile(data []byte, values a
 }
 
 var fullTypeList = []arrow.DataType{
-       arrow.FixedWidthTypes.Boolean,
-       arrow.PrimitiveTypes.Uint8,
-       arrow.PrimitiveTypes.Int8,
-       arrow.PrimitiveTypes.Uint16,
-       arrow.PrimitiveTypes.Int16,
-       arrow.PrimitiveTypes.Uint32,
-       arrow.PrimitiveTypes.Int32,
-       arrow.PrimitiveTypes.Uint64,
-       arrow.PrimitiveTypes.Int64,
-       arrow.FixedWidthTypes.Date32,
-       arrow.PrimitiveTypes.Float32,
-       arrow.PrimitiveTypes.Float64,
-       arrow.BinaryTypes.String,
-       arrow.BinaryTypes.Binary,
-       &arrow.FixedSizeBinaryType{ByteWidth: 10},
+       // arrow.FixedWidthTypes.Boolean,
+       // arrow.PrimitiveTypes.Uint8,
+       // arrow.PrimitiveTypes.Int8,
+       // arrow.PrimitiveTypes.Uint16,
+       // arrow.PrimitiveTypes.Int16,
+       // arrow.PrimitiveTypes.Uint32,
+       // arrow.PrimitiveTypes.Int32,
+       // arrow.PrimitiveTypes.Uint64,
+       // arrow.PrimitiveTypes.Int64,
+       // arrow.FixedWidthTypes.Date32,
+       // arrow.PrimitiveTypes.Float32,
+       // arrow.PrimitiveTypes.Float64,
+       // arrow.BinaryTypes.String,
+       // arrow.BinaryTypes.Binary,
+       // &arrow.FixedSizeBinaryType{ByteWidth: 10},
        &arrow.Decimal128Type{Precision: 1, Scale: 0},
        &arrow.Decimal128Type{Precision: 5, Scale: 4},
        &arrow.Decimal128Type{Precision: 10, Scale: 9},
diff --git a/go/parquet/pqarrow/file_writer.go 
b/go/parquet/pqarrow/file_writer.go
index 1f0a946..5109602 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -143,7 +143,7 @@ func (fw *FileWriter) WriteBuffered(rec array.Record) error 
{
        var (
                recList []array.Record
                maxRows = fw.wr.Properties().MaxRowGroupLength()
-               curRows int64
+               curRows int
                err     error
        )
        if fw.rgw != nil {
@@ -154,12 +154,12 @@ func (fw *FileWriter) WriteBuffered(rec array.Record) 
error {
                fw.NewBufferedRowGroup()
        }
 
-       if curRows+rec.NumRows() <= maxRows {
+       if int64(curRows)+rec.NumRows() <= maxRows {
                recList = []array.Record{rec}
        } else {
-               recList = []array.Record{rec.NewSlice(0, maxRows-curRows)}
+               recList = []array.Record{rec.NewSlice(0, 
maxRows-int64(curRows))}
                defer recList[0].Release()
-               for offset := int64(maxRows - curRows); offset < rec.NumRows(); 
offset += maxRows {
+               for offset := int64(maxRows - int64(curRows)); offset < 
rec.NumRows(); offset += maxRows {
                        s := rec.NewSlice(offset, offset+utils.Min(maxRows, 
rec.NumRows()-offset))
                        defer s.Release()
                        recList = append(recList, s)
diff --git a/go/parquet/pqarrow/path_builder.go 
b/go/parquet/pqarrow/path_builder.go
index a90644c..cd7c577 100644
--- a/go/parquet/pqarrow/path_builder.go
+++ b/go/parquet/pqarrow/path_builder.go
@@ -427,7 +427,7 @@ func (p *pathBuilder) Visit(arr array.Interface) error {
                return nil
        case arrow.EXTENSION:
                return xerrors.New("extension types not implemented yet")
-       case arrow.UNION:
+       case arrow.SPARSE_UNION, arrow.DENSE_UNION:
                return xerrors.New("union types aren't supported in parquet")
        default:
                p.addTerminalInfo(arr)
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index 5c72a41..22810e6 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -913,7 +913,7 @@ func getNestedFactory(origin, inferred arrow.DataType) 
func(fieldList []arrow.Fi
 func applyOriginalStorageMetadata(origin arrow.Field, inferred *SchemaField) 
(modified bool, err error) {
        nchildren := len(inferred.Children)
        switch origin.Type.ID() {
-       case arrow.EXTENSION, arrow.UNION, arrow.DICTIONARY:
+       case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION, 
arrow.DICTIONARY:
                err = xerrors.New("unimplemented type")
        case arrow.STRUCT:
                typ := origin.Type.(*arrow.StructType)

Reply via email to