This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new fd2c80fb22 GH-33875: [Go] Handle writing LargeString and LargeBinary
types (#33965)
fd2c80fb22 is described below
commit fd2c80fb22d26d78b574b8f397979137b9c615b6
Author: Min-Young Wu <[email protected]>
AuthorDate: Thu Feb 2 08:15:13 2023 -0800
GH-33875: [Go] Handle writing LargeString and LargeBinary types (#33965)
### Rationale for this change
Handle writing `array.LargeString` and `array.LargeBinary` data types. This
allows parquet files to contain more than 2G worth of binary data in a single
column chunk.
### Are these changes tested?
Unit tests included
### Are there any user-facing changes?
* Closes: #33875
Authored-by: Min-Young Wu <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/parquet/file/record_reader.go | 13 +++++----
go/parquet/internal/testutils/random_arrow.go | 11 ++++++--
go/parquet/pqarrow/column_readers.go | 6 ++--
go/parquet/pqarrow/encode_arrow.go | 25 ++++++++++++-----
go/parquet/pqarrow/encode_arrow_test.go | 40 +++++++++++++++++++++++++--
go/parquet/pqarrow/schema.go | 7 +++--
6 files changed, 80 insertions(+), 22 deletions(-)
diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go
index 2950b1ab0a..6041e4a216 100755
--- a/go/parquet/file/record_reader.go
+++ b/go/parquet/file/record_reader.go
@@ -751,14 +751,15 @@ type byteArrayRecordReader struct {
valueBuf []parquet.ByteArray
}
-func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, mem
memory.Allocator, bufferPool *sync.Pool) RecordReader {
+func newByteArrayRecordReader(descr *schema.Column, info LevelInfo, dtype
arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
if mem == nil {
mem = memory.DefaultAllocator
}
- dt := arrow.BinaryTypes.Binary
- if descr.LogicalType().Equals(schema.StringLogicalType{}) {
- dt = arrow.BinaryTypes.String
+ dt, ok := dtype.(arrow.BinaryDataType)
+ // arrow.DecimalType will also come through here, which we want to
treat as binary
+ if !ok {
+ dt = arrow.BinaryTypes.Binary
}
return &binaryRecordReader{&recordReader{
@@ -841,10 +842,10 @@ func (br *byteArrayRecordReader) GetBuilderChunks()
[]arrow.Array {
// TODO(mtopol): create optimized readers for dictionary types after
ARROW-7286 is done
-func NewRecordReader(descr *schema.Column, info LevelInfo, readDict bool, mem
memory.Allocator, bufferPool *sync.Pool) RecordReader {
+func NewRecordReader(descr *schema.Column, info LevelInfo, dtype
arrow.DataType, mem memory.Allocator, bufferPool *sync.Pool) RecordReader {
switch descr.PhysicalType() {
case parquet.Types.ByteArray:
- return newByteArrayRecordReader(descr, info, mem, bufferPool)
+ return newByteArrayRecordReader(descr, info, dtype, mem,
bufferPool)
case parquet.Types.FixedLenByteArray:
return newFLBARecordReader(descr, info, mem, bufferPool)
default:
diff --git a/go/parquet/internal/testutils/random_arrow.go
b/go/parquet/internal/testutils/random_arrow.go
index f21ace3422..719a9f5dd0 100644
--- a/go/parquet/internal/testutils/random_arrow.go
+++ b/go/parquet/internal/testutils/random_arrow.go
@@ -136,8 +136,15 @@ func RandomNonNull(dt arrow.DataType, size int)
arrow.Array {
bldr.Append("test-string")
}
return bldr.NewArray()
- case arrow.BINARY:
- bldr := array.NewBinaryBuilder(memory.DefaultAllocator,
arrow.BinaryTypes.Binary)
+ case arrow.LARGE_STRING:
+ bldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+ defer bldr.Release()
+ for i := 0; i < size; i++ {
+ bldr.Append("test-large-string")
+ }
+ return bldr.NewArray()
+ case arrow.BINARY, arrow.LARGE_BINARY:
+ bldr := array.NewBinaryBuilder(memory.DefaultAllocator,
dt.(arrow.BinaryDataType))
defer bldr.Release()
buf := make([]byte, 12)
diff --git a/go/parquet/pqarrow/column_readers.go
b/go/parquet/pqarrow/column_readers.go
index 0b7e5598d2..c2cabbc441 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -57,7 +57,7 @@ func newLeafReader(rctx *readerCtx, field *arrow.Field, input
*columnIterator, l
field: field,
input: input,
descr: input.Descr(),
- recordRdr: file.NewRecordReader(input.Descr(), leafInfo,
field.Type.ID() == arrow.DICTIONARY, rctx.mem, bufferPool),
+ recordRdr: file.NewRecordReader(input.Descr(), leafInfo,
field.Type, rctx.mem, bufferPool),
props: props,
refCount: 1,
}
@@ -480,7 +480,7 @@ func transferColumnData(rdr file.RecordReader, valueType
arrow.DataType, descr *
data = transferInt(rdr, valueType)
case arrow.DATE64:
data = transferDate64(rdr, valueType)
- case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING:
+ case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING,
arrow.LARGE_BINARY, arrow.LARGE_STRING:
return transferBinary(rdr, valueType), nil
case arrow.DECIMAL:
switch descr.PhysicalType() {
@@ -534,7 +534,7 @@ func transferZeroCopy(rdr file.RecordReader, dt
arrow.DataType) arrow.ArrayData
func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked {
brdr := rdr.(file.BinaryRecordReader)
chunks := brdr.GetBuilderChunks()
- if dt == arrow.BinaryTypes.String {
+ if dt == arrow.BinaryTypes.String || dt ==
arrow.BinaryTypes.LargeString {
// convert chunks from binary to string without copying data,
// just changing the interpretation of the metadata
for idx := range chunks {
diff --git a/go/parquet/pqarrow/encode_arrow.go
b/go/parquet/pqarrow/encode_arrow.go
index 717216c3ac..de24f211de 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -231,6 +231,10 @@ type binaryarr interface {
ValueOffsets() []int32
}
+type binary64arr interface {
+ ValueOffsets() []int64
+}
+
func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter,
leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err
error) {
noNulls := cw.Descr().SchemaNode().RepetitionType() ==
parquet.Repetitions.Required || leafArr.NullN() == 0
@@ -421,12 +425,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels,
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.ByteArrayColumnChunkWriter:
- if leafArr.DataType().ID() != arrow.STRING &&
leafArr.DataType().ID() != arrow.BINARY {
- return xerrors.New("invalid column type to write to
ByteArray")
- }
-
var (
- offsets = leafArr.(binaryarr).ValueOffsets()
buffer = leafArr.Data().Buffers()[2]
valueBuf []byte
)
@@ -438,9 +437,21 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
data := make([]parquet.ByteArray, leafArr.Len())
- for i := range data {
- data[i] =
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+ switch leafArr.DataType().ID() {
+ case arrow.BINARY, arrow.STRING:
+ offsets := leafArr.(binaryarr).ValueOffsets()
+ for i := range data {
+ data[i] =
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+ }
+ case arrow.LARGE_BINARY, arrow.LARGE_STRING:
+ offsets := leafArr.(binary64arr).ValueOffsets()
+ for i := range data {
+ data[i] =
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
+ }
+ default:
+ return xerrors.New(fmt.Sprintf("invalid column type to
write to ByteArray: %s", leafArr.DataType().Name()))
}
+
if !maybeParentNulls && noNulls {
wr.WriteBatch(data, defLevels, repLevels)
} else {
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index 5ae228eece..00632b4abb 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -391,7 +391,7 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType {
return schema.NewIntLogicalType(64, true)
case arrow.UINT64:
return schema.NewIntLogicalType(64, false)
- case arrow.STRING:
+ case arrow.STRING, arrow.LARGE_STRING:
return schema.StringLogicalType{}
case arrow.DATE32:
return schema.DateLogicalType{}
@@ -441,7 +441,7 @@ func getPhysicalType(typ arrow.DataType) parquet.Type {
return parquet.Types.Float
case arrow.FLOAT64:
return parquet.Types.Double
- case arrow.BINARY, arrow.STRING:
+ case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING:
return parquet.Types.ByteArray
case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL:
return parquet.Types.FixedLenByteArray
@@ -710,6 +710,42 @@ func (ps *ParquetIOTestSuite)
TestDateTimeTypesWithInt96ReadWriteTable() {
}
}
+func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ // While we may write using LargeString, when we read, we get an
array.String back out.
+ // So we're building a normal array.String to use with array.Equal
+ lsBldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+ defer lsBldr.Release()
+ lbBldr := array.NewBinaryBuilder(memory.DefaultAllocator,
arrow.BinaryTypes.LargeBinary)
+ defer lbBldr.Release()
+
+ for i := 0; i < smallSize; i++ {
+ s := strconv.FormatInt(int64(i), 10)
+ lsBldr.Append(s)
+ lbBldr.Append([]byte(s))
+ }
+
+ lsValues := lsBldr.NewArray()
+ defer lsValues.Release()
+ lbValues := lbBldr.NewArray()
+ defer lbValues.Release()
+
+ lsField := arrow.Field{Name: "large_string", Type:
arrow.BinaryTypes.LargeString, Nullable: true}
+ lbField := arrow.Field{Name: "large_binary", Type:
arrow.BinaryTypes.LargeBinary, Nullable: true}
+ expected := array.NewTable(
+ arrow.NewSchema([]arrow.Field{lsField, lbField}, nil),
+ []arrow.Column{
+ *arrow.NewColumn(lsField,
arrow.NewChunked(lsField.Type, []arrow.Array{lsValues})),
+ *arrow.NewColumn(lbField,
arrow.NewChunked(lbField.Type, []arrow.Array{lbValues})),
+ },
+ -1,
+ )
+ defer expected.Release()
+ ps.roundTripTable(expected, true)
+}
+
func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
types := []arrow.DataType{
arrow.FixedWidthTypes.Boolean,
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index 9b5238ed5b..8fb25cc80f 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -292,10 +292,10 @@ func fieldToNode(name string, field arrow.Field, props
*parquet.WriterProperties
typ = parquet.Types.Float
case arrow.FLOAT64:
typ = parquet.Types.Double
- case arrow.STRING:
+ case arrow.STRING, arrow.LARGE_STRING:
logicalType = schema.StringLogicalType{}
fallthrough
- case arrow.BINARY:
+ case arrow.BINARY, arrow.LARGE_BINARY:
typ = parquet.Types.ByteArray
case arrow.FIXED_SIZE_BINARY:
typ = parquet.Types.FixedLenByteArray
@@ -1001,6 +1001,9 @@ func applyOriginalStorageMetadata(origin arrow.Field,
inferred *SchemaField) (mo
inferred.Field.Type = origin.Type
}
modified = true
+ case arrow.LARGE_STRING, arrow.LARGE_BINARY:
+ inferred.Field.Type = origin.Type
+ modified = true
}
if origin.HasMetadata() {