This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 370dc98 feat(parquet/pqarrow): Add ForceLarge option (#197)
370dc98 is described below
commit 370dc9853d6650ecac03c9555b7f04d67689bd25
Author: Matt Topol <[email protected]>
AuthorDate: Mon Dec 9 14:20:22 2024 -0500
feat(parquet/pqarrow): Add ForceLarge option (#197)
### Rationale for this change
closes #195
For parquet files that contain more than 2GB of data in a column, we
should allow a user to force using the LargeString/LargeBinary variants
without requiring a stored schema.
### What changes are included in this PR?
Adds `ForceLarge` option to `pqarrow.ArrowReadProperties` and enables it
to force usage of `LargeString` and `LargeBinary` data types.
### Are these changes tested?
Yes, a unit test is added.
### Are there any user-facing changes?
No breaking changes, only the addition of a new option.
---
parquet/pqarrow/encode_arrow_test.go | 47 ++++++++++++++++++++++++++++++++++++
parquet/pqarrow/file_reader.go | 6 ++++-
parquet/pqarrow/properties.go | 32 +++++++++++++++++++++++-
parquet/pqarrow/schema.go | 18 ++++++++++++++
4 files changed, 101 insertions(+), 2 deletions(-)
diff --git a/parquet/pqarrow/encode_arrow_test.go
b/parquet/pqarrow/encode_arrow_test.go
index b75a5c0..78e9c68 100644
--- a/parquet/pqarrow/encode_arrow_test.go
+++ b/parquet/pqarrow/encode_arrow_test.go
@@ -1945,6 +1945,53 @@ func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}
+func TestForceLargeTypes(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ sc := arrow.NewSchema([]arrow.Field{
+ {Name: "str", Type: arrow.BinaryTypes.LargeString},
+ {Name: "bin", Type: arrow.BinaryTypes.LargeBinary},
+ }, nil)
+
+ bldr := array.NewRecordBuilder(mem, sc)
+ defer bldr.Release()
+
+
bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo",
"bar"}, nil)
+
bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"),
[]byte("foo"), []byte("bar")}, nil)
+
+ rec := bldr.NewRecord()
+ defer rec.Release()
+
+ var buf bytes.Buffer
+ wr, err := pqarrow.NewFileWriter(sc, &buf,
+ parquet.NewWriterProperties(),
+ pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
+ require.NoError(t, err)
+
+ require.NoError(t, wr.Write(rec))
+ require.NoError(t, wr.Close())
+
+ rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ props := pqarrow.ArrowReadProperties{}
+ props.SetForceLarge(0, true)
+ props.SetForceLarge(1, true)
+ pqrdr, err := pqarrow.NewFileReader(rdr, props, mem)
+ require.NoError(t, err)
+
+ recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil)
+ require.NoError(t, err)
+ defer recrdr.Release()
+
+ got, err := recrdr.Read()
+ require.NoError(t, err)
+
+ assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s",
rec, got)
+}
+
func TestBufferedRecWrite(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index c9a83b8..d6eae17 100755
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context,
colIndices, rowGroups
nrows += fr.rdr.MetaData().RowGroup(rg).NumRows()
}
+ batchSize := fr.Props.BatchSize
+ if fr.Props.BatchSize <= 0 {
+ batchSize = nrows
+ }
return &recordReader{
numRows: nrows,
- batchSize: fr.Props.BatchSize,
+ batchSize: batchSize,
parallel: fr.Props.Parallel,
sc: sc,
fieldReaders: readers,
diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go
index dc7f4e0..d349a39 100755
--- a/parquet/pqarrow/properties.go
+++ b/parquet/pqarrow/properties.go
@@ -166,7 +166,37 @@ type ArrowReadProperties struct {
// BatchSize is the size used for calls to NextBatch when reading whole
columns
BatchSize int64
- readDictIndices map[int]struct{}
+ readDictIndices map[int]struct{}
+ forceLargeIndices map[int]struct{}
+}
+
+// SetForceLarge determines whether a particular column, if it is String or
Binary,
+// will use the LargeString/LargeBinary variants (with int64 offsets) instead
of int32
+// offsets. This is specifically useful if you know that particular columns
contain more
+// than 2GB worth of byte data which would prevent use of int32 offsets.
+//
+// Passing false will use the default variants while passing true will use the
large
+// variant. If the passed column index is not a string or binary column, then
this will
+// have no effect.
+func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) {
+ if props.forceLargeIndices == nil {
+ props.forceLargeIndices = make(map[int]struct{})
+ }
+
+ if forceLarge {
+ props.forceLargeIndices[colIdx] = struct{}{}
+ } else {
+ delete(props.forceLargeIndices, colIdx)
+ }
+}
+
+func (props *ArrowReadProperties) ForceLarge(colIdx int) bool {
+ if props.forceLargeIndices == nil {
+ return false
+ }
+
+ _, ok := props.forceLargeIndices[colIdx]
+ return ok
}
// SetReadDict determines whether to read a particular column as dictionary
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index 6d30359..efb9551 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -717,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels
file.LevelInfo, ctx *s
arrowType = &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}
+ if arrow.IsBinaryLike(arrowType.ID()) &&
ctx.props.ForceLarge(colIndex) {
+ switch arrowType.ID() {
+ case arrow.STRING:
+ arrowType = arrow.BinaryTypes.LargeString
+ case arrow.BINARY:
+ arrowType = arrow.BinaryTypes.LargeBinary
+ }
+ }
+
itemField := arrow.Field{Name: listNode.Name(), Type:
arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))}
populateLeaf(colIndex, &itemField, currentLevels, ctx, out,
&out.Children[0])
}
@@ -891,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels
file.LevelInfo, ctx *schemaT
arrowType = &arrow.DictionaryType{IndexType:
arrow.PrimitiveTypes.Int32, ValueType: arrowType}
}
+ if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex)
{
+ switch arrowType.ID() {
+ case arrow.STRING:
+ arrowType = arrow.BinaryTypes.LargeString
+ case arrow.BINARY:
+ arrowType = arrow.BinaryTypes.LargeBinary
+ }
+ }
+
if primitive.RepetitionType() == parquet.Repetitions.Repeated {
// one-level list encoding e.g. a: repeated int32;
repeatedAncestorDefLevel := currentLevels.IncrementRepeated()