This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new fd2c80fb22 GH-33875: [Go] Handle writing LargeString and LargeBinary 
types (#33965)
fd2c80fb22 is described below

commit fd2c80fb22d26d78b574b8f397979137b9c615b6
Author: Min-Young Wu <[email protected]>
AuthorDate: Thu Feb 2 08:15:13 2023 -0800

    GH-33875: [Go] Handle writing LargeString and LargeBinary types (#33965)
    
    
    
    ### Rationale for this change
    
    Handle writing `array.LargeString` and `array.LargeBinary` data types. This 
allows parquet files to contain more than 2G worth of binary data in a single 
column chunk.
    
    ### Are these changes tested?
    
    Unit tests included
    
    ### Are there any user-facing changes?
    
    * Closes: #33875
    
    Authored-by: Min-Young Wu <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/parquet/file/record_reader.go              | 13 +++++----
 go/parquet/internal/testutils/random_arrow.go | 11 ++++++--
 go/parquet/pqarrow/column_readers.go          |  6 ++--
 go/parquet/pqarrow/encode_arrow.go            | 25 ++++++++++++-----
 go/parquet/pqarrow/encode_arrow_test.go       | 40 +++++++++++++++++++++++++--
 go/parquet/pqarrow/schema.go                  |  7 +++--
 6 files changed, 80 insertions(+), 22 deletions(-)

diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
index 2950b1ab0a..6041e4a216 100755
--- a/go/parquet/file/record_reader.go
+++ b/go/parquet/file/record_reader.go
@@ -751,14 +751,15 @@ type byteArrayRecordReader struct {
        valueBuf []parquet.ByteArray
 }
 
-func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, dtype 
arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
        if mem == nil {
                mem = memory.DefaultAllocator
        }
 
-       dt := arrow.BinaryTypes.Binary
-       if descr.LogicalType().Equals(schema.StringLogicalType{}) {
-               dt = arrow.BinaryTypes.String
+       dt, ok := dtype.(arrow.BinaryDataType)
+       // arrow.DecimalType will also come through here, which we want to 
treat as binary
+       if !ok {
+               dt = arrow.BinaryTypes.Binary
        }
 
        return &binaryRecordReader{&recordReader{
@@ -841,10 +842,10 @@ func (br *byteArrayRecordReader) GetBuilderChunks() 
[]arrow.Array {
 
 // TODO(mtopol): create optimized readers for dictionary types after 
ARROW-7286 is done
 
-func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem 
memory.Allocator, bufferPool *sync.Pool) RecordReader {
+func NewRecordReader(descr *schema.Column, info LevelInfo, dtype 
arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
        switch descr.PhysicalType() {
        case parquet.Types.ByteArray:
-               return newByteArrayRecordReader(descr, info, mem, bufferPool)
+               return newByteArrayRecordReader(descr, info, dtype, mem, 
bufferPool)
        case parquet.Types.FixedLenByteArray:
                return newFLBARecordReader(descr, info, mem, bufferPool)
        default:
diff --git a/go/parquet/internal/testutils/random_arrow.go 
b/go/parquet/internal/testutils/random_arrow.go
index f21ace3422..719a9f5dd0 100644
--- a/go/parquet/internal/testutils/random_arrow.go
+++ b/go/parquet/internal/testutils/random_arrow.go
@@ -136,8 +136,15 @@ func RandomNonNull(dt arrow.DataType, size int) 
arrow.Array {
                        bldr.Append("test-string")
                }
                return bldr.NewArray()
-       case arrow.BINARY:
-               bldr := array.NewBinaryBuilder(memory.DefaultAllocator, 
arrow.BinaryTypes.Binary)
+       case arrow.LARGE_STRING:
+               bldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+               defer bldr.Release()
+               for i := 0; i < size; i++ {
+                       bldr.Append("test-large-string")
+               }
+               return bldr.NewArray()
+       case arrow.BINARY, arrow.LARGE_BINARY:
+               bldr := array.NewBinaryBuilder(memory.DefaultAllocator, 
dt.(arrow.BinaryDataType))
                defer bldr.Release()
 
                buf := make([]byte, 12)
diff --git a/go/parquet/pqarrow/column_readers.go 
b/go/parquet/pqarrow/column_readers.go
index 0b7e5598d2..c2cabbc441 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -57,7 +57,7 @@ func newLeafReader(rctx *readerCtx, field *arrow.Field, input 
*columnIterator, l
                field:     field,
                input:     input,
                descr:     input.Descr(),
-               recordRdr: file.NewRecordReader(input.Descr(), leafInfo, 
field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
+               recordRdr: file.NewRecordReader(input.Descr(), leafInfo, 
field.Type, rctx.mem, bufferPool),
                props:     props,
                refCount:  1,
        }
@@ -480,7 +480,7 @@ func transferColumnData(rdr file.RecordReader, valueType 
arrow.DataType, descr *
                data = transferInt(rdr, valueType)
        case arrow.DATE64:
                data = transferDate64(rdr, valueType)
-       case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING:
+       case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, 
arrow.LARGE_BINARY, arrow.LARGE_STRING:
                return transferBinary(rdr, valueType), nil
        case arrow.DECIMAL:
                switch descr.PhysicalType() {
@@ -534,7 +534,7 @@ func transferZeroCopy(rdr file.RecordReader, dt 
arrow.DataType) arrow.ArrayData
 func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked {
        brdr := rdr.(file.BinaryRecordReader)
        chunks := brdr.GetBuilderChunks()
-       if dt == arrow.BinaryTypes.String {
+       if dt == arrow.BinaryTypes.String || dt == 
arrow.BinaryTypes.LargeString {
                // convert chunks from binary to string without copying data,
                // just changing the interpretation of the metadata
                for idx := range chunks {
diff --git a/go/parquet/pqarrow/encode_arrow.go 
b/go/parquet/pqarrow/encode_arrow.go
index 717216c3ac..de24f211de 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -231,6 +231,10 @@ type binaryarr interface {
        ValueOffsets() []int32
 }
 
+type binary64arr interface {
+       ValueOffsets() []int64
+}
+
 func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, 
leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err 
error) {
        noNulls := cw.Descr().SchemaNode().RepetitionType() == 
parquet.Repetitions.Required || leafArr.NullN() == 0
 
@@ -421,12 +425,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                        
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, 
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
        case *file.ByteArrayColumnChunkWriter:
-               if leafArr.DataType().ID() != arrow.STRING && 
leafArr.DataType().ID() != arrow.BINARY {
-                       return xerrors.New("invalid column type to write to 
ByteArray")
-               }
-
                var (
-                       offsets  = leafArr.(binaryarr).ValueOffsets()
                        buffer   = leafArr.Data().Buffers()[2]
                        valueBuf []byte
                )
@@ -438,9 +437,21 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                }
 
                data := make([]parquet.ByteArray, leafArr.Len())
-               for i := range data {
-                       data[i] = 
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+               switch leafArr.DataType().ID() {
+               case arrow.BINARY, arrow.STRING:
+                       offsets := leafArr.(binaryarr).ValueOffsets()
+                       for i := range data {
+                               data[i] = 
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+                       }
+               case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+                       offsets := leafArr.(binary64arr).ValueOffsets()
+                       for i := range data {
+                               data[i] = 
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+                       }
+               default:
+                       return xerrors.New(fmt.Sprintf("invalid column type to 
write to ByteArray: %s", leafArr.DataType().Name()))
                }
+
                if !maybeParentNulls && noNulls {
                        wr.WriteBatch(data, defLevels, repLevels)
                } else {
diff --git a/go/parquet/pqarrow/encode_arrow_test.go 
b/go/parquet/pqarrow/encode_arrow_test.go
index 5ae228eece..00632b4abb 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -391,7 +391,7 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType {
                return schema.NewIntLogicalType(64, true)
        case arrow.UINT64:
                return schema.NewIntLogicalType(64, false)
-       case arrow.STRING:
+       case arrow.STRING, arrow.LARGE_STRING:
                return schema.StringLogicalType{}
        case arrow.DATE32:
                return schema.DateLogicalType{}
@@ -441,7 +441,7 @@ func getPhysicalType(typ arrow.DataType) parquet.Type {
                return parquet.Types.Float
        case arrow.FLOAT64:
                return parquet.Types.Double
-       case arrow.BINARY, arrow.STRING:
+       case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING:
                return parquet.Types.ByteArray
        case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL:
                return parquet.Types.FixedLenByteArray
@@ -710,6 +710,42 @@ func (ps *ParquetIOTestSuite) 
TestDateTimeTypesWithInt96ReadWriteTable() {
        }
 }
 
+func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       // While we may write using LargeString, when we read, we get an 
array.String back out.
+       // So we're building a normal array.String to use with array.Equal
+       lsBldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+       defer lsBldr.Release()
+       lbBldr := array.NewBinaryBuilder(memory.DefaultAllocator, 
arrow.BinaryTypes.LargeBinary)
+       defer lbBldr.Release()
+
+       for i := 0; i < smallSize; i++ {
+               s := strconv.FormatInt(int64(i), 10)
+               lsBldr.Append(s)
+               lbBldr.Append([]byte(s))
+       }
+
+       lsValues := lsBldr.NewArray()
+       defer lsValues.Release()
+       lbValues := lbBldr.NewArray()
+       defer lbValues.Release()
+
+       lsField := arrow.Field{Name: "large_string", Type: 
arrow.BinaryTypes.LargeString, Nullable: true}
+       lbField := arrow.Field{Name: "large_binary", Type: 
arrow.BinaryTypes.LargeBinary, Nullable: true}
+       expected := array.NewTable(
+               arrow.NewSchema([]arrow.Field{lsField, lbField}, nil),
+               []arrow.Column{
+                       *arrow.NewColumn(lsField, 
arrow.NewChunked(lsField.Type, []arrow.Array{lsValues})),
+                       *arrow.NewColumn(lbField, 
arrow.NewChunked(lbField.Type, []arrow.Array{lbValues})),
+               },
+               -1,
+       )
+       defer expected.Release()
+       ps.roundTripTable(expected, true)
+}
+
 func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
        types := []arrow.DataType{
                arrow.FixedWidthTypes.Boolean,
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index 9b5238ed5b..8fb25cc80f 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -292,10 +292,10 @@ func fieldToNode(name string, field arrow.Field, props 
*parquet.WriterProperties
                typ = parquet.Types.Float
        case arrow.FLOAT64:
                typ = parquet.Types.Double
-       case arrow.STRING:
+       case arrow.STRING, arrow.LARGE_STRING:
                logicalType = schema.StringLogicalType{}
                fallthrough
-       case arrow.BINARY:
+       case arrow.BINARY, arrow.LARGE_BINARY:
                typ = parquet.Types.ByteArray
        case arrow.FIXED_SIZE_BINARY:
                typ = parquet.Types.FixedLenByteArray
@@ -1001,6 +1001,9 @@ func applyOriginalStorageMetadata(origin arrow.Field, 
inferred *SchemaField) (mo
                        inferred.Field.Type = origin.Type
                }
                modified = true
+       case arrow.LARGE_STRING, arrow.LARGE_BINARY:
+               inferred.Field.Type = origin.Type
+               modified = true
        }
 
        if origin.HasMetadata() {

Reply via email to