This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 370dc98  feat(parquet/pqarrow): Add ForceLarge option (#197)
370dc98 is described below

commit 370dc9853d6650ecac03c9555b7f04d67689bd25
Author: Matt Topol <[email protected]>
AuthorDate: Mon Dec 9 14:20:22 2024 -0500

    feat(parquet/pqarrow): Add ForceLarge option (#197)
    
    ### Rationale for this change
    closes #195
    
    For parquet files that contain more than 2GB of data in a column, we
    should allow a user to force using the LargeString/LargeBinary variants
    without requiring a stored schema.
    
    ### What changes are included in this PR?
    Adds `ForceLarge` option to `pqarrow.ArrowReadProperties` and enables it
    to force usage of `LargeString` and `LargeBinary` data types.
    
    ### Are these changes tested?
    Yes, a unit test is added.
    
    ### Are there any user-facing changes?
    No breaking changes, only the addition of a new option.
---
 parquet/pqarrow/encode_arrow_test.go | 47 ++++++++++++++++++++++++++++++++++++
 parquet/pqarrow/file_reader.go       |  6 ++++-
 parquet/pqarrow/properties.go        | 32 +++++++++++++++++++++++-
 parquet/pqarrow/schema.go            | 18 ++++++++++++++
 4 files changed, 101 insertions(+), 2 deletions(-)

diff --git a/parquet/pqarrow/encode_arrow_test.go 
b/parquet/pqarrow/encode_arrow_test.go
index b75a5c0..78e9c68 100644
--- a/parquet/pqarrow/encode_arrow_test.go
+++ b/parquet/pqarrow/encode_arrow_test.go
@@ -1945,6 +1945,53 @@ func TestParquetArrowIO(t *testing.T) {
        suite.Run(t, new(ParquetIOTestSuite))
 }
 
+func TestForceLargeTypes(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       sc := arrow.NewSchema([]arrow.Field{
+               {Name: "str", Type: arrow.BinaryTypes.LargeString},
+               {Name: "bin", Type: arrow.BinaryTypes.LargeBinary},
+       }, nil)
+
+       bldr := array.NewRecordBuilder(mem, sc)
+       defer bldr.Release()
+
+       
bldr.Field(0).(*array.LargeStringBuilder).AppendValues([]string{"hello", "foo", 
"bar"}, nil)
+       
bldr.Field(1).(*array.BinaryBuilder).AppendValues([][]byte{[]byte("hello"), 
[]byte("foo"), []byte("bar")}, nil)
+
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       var buf bytes.Buffer
+       wr, err := pqarrow.NewFileWriter(sc, &buf,
+               parquet.NewWriterProperties(),
+               pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
+       require.NoError(t, err)
+
+       require.NoError(t, wr.Write(rec))
+       require.NoError(t, wr.Close())
+
+       rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+       defer rdr.Close()
+
+       props := pqarrow.ArrowReadProperties{}
+       props.SetForceLarge(0, true)
+       props.SetForceLarge(1, true)
+       pqrdr, err := pqarrow.NewFileReader(rdr, props, mem)
+       require.NoError(t, err)
+
+       recrdr, err := pqrdr.GetRecordReader(context.Background(), nil, nil)
+       require.NoError(t, err)
+       defer recrdr.Release()
+
+       got, err := recrdr.Read()
+       require.NoError(t, err)
+
+       assert.Truef(t, array.RecordEqual(rec, got), "expected: %s\ngot: %s", 
rec, got)
+}
+
 func TestBufferedRecWrite(t *testing.T) {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(t, 0)
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index c9a83b8..d6eae17 100755
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -471,9 +471,13 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, 
colIndices, rowGroups
                nrows += fr.rdr.MetaData().RowGroup(rg).NumRows()
        }
 
+       batchSize := fr.Props.BatchSize
+       if fr.Props.BatchSize <= 0 {
+               batchSize = nrows
+       }
        return &recordReader{
                numRows:      nrows,
-               batchSize:    fr.Props.BatchSize,
+               batchSize:    batchSize,
                parallel:     fr.Props.Parallel,
                sc:           sc,
                fieldReaders: readers,
diff --git a/parquet/pqarrow/properties.go b/parquet/pqarrow/properties.go
index dc7f4e0..d349a39 100755
--- a/parquet/pqarrow/properties.go
+++ b/parquet/pqarrow/properties.go
@@ -166,7 +166,37 @@ type ArrowReadProperties struct {
        // BatchSize is the size used for calls to NextBatch when reading whole 
columns
        BatchSize int64
 
-       readDictIndices map[int]struct{}
+       readDictIndices   map[int]struct{}
+       forceLargeIndices map[int]struct{}
+}
+
+// SetForceLarge determines whether a particular column, if it is String or 
Binary,
+// will use the LargeString/LargeBinary variants (with int64 offsets) instead 
of int32
+// offsets. This is specifically useful if you know that particular columns 
contain more
+// than 2GB worth of byte data which would prevent use of int32 offsets.
+//
+// Passing false will use the default variants while passing true will use the 
large
+// variant. If the passed column index is not a string or binary column, then 
this will
+// have no effect.
+func (props *ArrowReadProperties) SetForceLarge(colIdx int, forceLarge bool) {
+       if props.forceLargeIndices == nil {
+               props.forceLargeIndices = make(map[int]struct{})
+       }
+
+       if forceLarge {
+               props.forceLargeIndices[colIdx] = struct{}{}
+       } else {
+               delete(props.forceLargeIndices, colIdx)
+       }
+}
+
+func (props *ArrowReadProperties) ForceLarge(colIdx int) bool {
+       if props.forceLargeIndices == nil {
+               return false
+       }
+
+       _, ok := props.forceLargeIndices[colIdx]
+       return ok
 }
 
 // SetReadDict determines whether to read a particular column as dictionary
diff --git a/parquet/pqarrow/schema.go b/parquet/pqarrow/schema.go
index 6d30359..efb9551 100644
--- a/parquet/pqarrow/schema.go
+++ b/parquet/pqarrow/schema.go
@@ -717,6 +717,15 @@ func listToSchemaField(n *schema.GroupNode, currentLevels 
file.LevelInfo, ctx *s
                        arrowType = &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Int32, ValueType: arrowType}
                }
 
+               if arrow.IsBinaryLike(arrowType.ID()) && 
ctx.props.ForceLarge(colIndex) {
+                       switch arrowType.ID() {
+                       case arrow.STRING:
+                               arrowType = arrow.BinaryTypes.LargeString
+                       case arrow.BINARY:
+                               arrowType = arrow.BinaryTypes.LargeBinary
+                       }
+               }
+
                itemField := arrow.Field{Name: listNode.Name(), Type: 
arrowType, Nullable: false, Metadata: createFieldMeta(int(listNode.FieldID()))}
                populateLeaf(colIndex, &itemField, currentLevels, ctx, out, 
&out.Children[0])
        }
@@ -891,6 +900,15 @@ func nodeToSchemaField(n schema.Node, currentLevels 
file.LevelInfo, ctx *schemaT
                arrowType = &arrow.DictionaryType{IndexType: 
arrow.PrimitiveTypes.Int32, ValueType: arrowType}
        }
 
+       if arrow.IsBinaryLike(arrowType.ID()) && ctx.props.ForceLarge(colIndex) 
{
+               switch arrowType.ID() {
+               case arrow.STRING:
+                       arrowType = arrow.BinaryTypes.LargeString
+               case arrow.BINARY:
+                       arrowType = arrow.BinaryTypes.LargeBinary
+               }
+       }
+
        if primitive.RepetitionType() == parquet.Repetitions.Repeated {
                // one-level list encoding e.g. a: repeated int32;
                repeatedAncestorDefLevel := currentLevels.IncrementRepeated()

Reply via email to