This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a01d9fc feat: support conversion of chunked arrays (#553)
3a01d9fc is described below

commit 3a01d9fc298db6c4cf4641704dfce8efaf8412cf
Author: Ahmed Mezghani <[email protected]>
AuthorDate: Wed Oct 29 17:00:55 2025 +0100

    feat: support conversion of chunked arrays (#553)
    
    ### Rationale for this change
    
    Fixes https://github.com/apache/arrow-go/issues/552
    
    ### What changes are included in this PR?
    
    Make `chunksToSingle` concatenate multiple chunks into a single array
    using `array.Concatenate` instead of returning `ErrNotImplemented`
    
    ### Are these changes tested?
    
    Unit tests + tested end-to-end locally on a parquet file that contains
    chunked arrays
    
    ### Are there any user-facing changes?
    
    Changes are backward compatible
---
 parquet/pqarrow/column_readers.go      |  21 ++-
 parquet/pqarrow/column_readers_test.go | 275 +++++++++++++++++++++++++++++++++
 parquet/pqarrow/file_reader.go         |   4 +-
 3 files changed, 292 insertions(+), 8 deletions(-)

diff --git a/parquet/pqarrow/column_readers.go 
b/parquet/pqarrow/column_readers.go
index 3ea81a71..1be8a48e 100644
--- a/parquet/pqarrow/column_readers.go
+++ b/parquet/pqarrow/column_readers.go
@@ -319,7 +319,7 @@ func (sr *structReader) BuildArray(lenBound int64) 
(*arrow.Chunked, error) {
                        return nil, err
                }
 
-               childArrData[i], err = chunksToSingle(field)
+               childArrData[i], err = chunksToSingle(field, sr.rctx.mem)
                field.Release() // release field before checking
                if err != nil {
                        return nil, err
@@ -442,7 +442,7 @@ func (lr *listReader) BuildArray(lenBound int64) 
(*arrow.Chunked, error) {
                
validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read)))
        }
 
-       item, err := chunksToSingle(arr)
+       item, err := chunksToSingle(arr, lr.rctx.mem)
        if err != nil {
                return nil, err
        }
@@ -489,9 +489,7 @@ func newFixedSizeListReader(rctx *readerCtx, field 
*arrow.Field, info file.Level
 }
 
 // helper function to combine chunks into a single array.
-//
-// nested data conversion for chunked array outputs not yet implemented
-func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
+func chunksToSingle(chunked *arrow.Chunked, mem memory.Allocator) 
(arrow.ArrayData, error) {
        switch len(chunked.Chunks()) {
        case 0:
                return array.NewData(chunked.DataType(), 0, 
[]*memory.Buffer{nil, nil}, nil, 0, 0), nil
@@ -499,8 +497,17 @@ func chunksToSingle(chunked *arrow.Chunked) 
(arrow.ArrayData, error) {
                data := chunked.Chunk(0).Data()
                data.Retain() // we pass control to the caller
                return data, nil
-       default: // if an item reader yields a chunked array, this is not yet 
implemented
-               return nil, arrow.ErrNotImplemented
+       default:
+               // concatenate multiple chunks into a single array
+               concatenated, err := array.Concatenate(chunked.Chunks(), mem)
+               if err != nil {
+                       return nil, err
+               }
+               defer concatenated.Release()
+
+               data := concatenated.Data()
+               data.Retain()
+               return data, nil
        }
 }
 
diff --git a/parquet/pqarrow/column_readers_test.go 
b/parquet/pqarrow/column_readers_test.go
new file mode 100644
index 00000000..6a86b3f9
--- /dev/null
+++ b/parquet/pqarrow/column_readers_test.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pqarrow
+
+import (
+       "bytes"
+       "context"
+       "io"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/compress"
+       "github.com/apache/arrow-go/v18/parquet/file"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+func TestChunksToSingle(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       t.Run("empty chunked array", func(t *testing.T) {
+               chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, 
[]arrow.Array{})
+               defer chunked.Release()
+
+               result, err := chunksToSingle(chunked, mem)
+               require.NoError(t, err)
+               defer result.Release()
+
+               assert.Equal(t, 0, result.Len())
+       })
+
+       t.Run("single chunk", func(t *testing.T) {
+               bldr := array.NewInt32Builder(mem)
+               defer bldr.Release()
+               bldr.AppendValues([]int32{1, 2, 3, 4, 5}, nil)
+               arr := bldr.NewInt32Array()
+               defer arr.Release()
+
+               chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, 
[]arrow.Array{arr})
+               defer chunked.Release()
+
+               result, err := chunksToSingle(chunked, mem)
+               require.NoError(t, err)
+               defer result.Release()
+
+               assert.Equal(t, 5, result.Len())
+       })
+
+       t.Run("multiple chunks", func(t *testing.T) {
+               bldr := array.NewInt32Builder(mem)
+               defer bldr.Release()
+
+               bldr.AppendValues([]int32{1, 2, 3}, nil)
+               chunk1 := bldr.NewInt32Array()
+               defer chunk1.Release()
+
+               bldr.AppendValues([]int32{4, 5, 6}, nil)
+               chunk2 := bldr.NewInt32Array()
+               defer chunk2.Release()
+
+               bldr.AppendValues([]int32{7, 8, 9, 10}, nil)
+               chunk3 := bldr.NewInt32Array()
+               defer chunk3.Release()
+
+               chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, 
[]arrow.Array{chunk1, chunk2, chunk3})
+               defer chunked.Release()
+
+               result, err := chunksToSingle(chunked, mem)
+               require.NoError(t, err)
+               defer result.Release()
+
+               assert.Equal(t, 10, result.Len())
+
+               // Verify concatenated values
+               resultArr := array.MakeFromData(result).(*array.Int32)
+               defer resultArr.Release()
+               for i, expected := range []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} 
{
+                       assert.Equal(t, expected, resultArr.Value(i))
+               }
+       })
+
+       t.Run("multiple chunks with nulls", func(t *testing.T) {
+               bldr := array.NewInt32Builder(mem)
+               defer bldr.Release()
+
+               bldr.AppendValues([]int32{1, 0, 3}, []bool{true, false, true})
+               chunk1 := bldr.NewInt32Array()
+               defer chunk1.Release()
+
+               bldr.AppendValues([]int32{4, 0, 6}, []bool{true, false, true})
+               chunk2 := bldr.NewInt32Array()
+               defer chunk2.Release()
+
+               chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, 
[]arrow.Array{chunk1, chunk2})
+               defer chunked.Release()
+
+               result, err := chunksToSingle(chunked, mem)
+               require.NoError(t, err)
+               defer result.Release()
+
+               assert.Equal(t, 6, result.Len())
+               assert.Equal(t, 2, result.NullN())
+
+               resultArr := array.MakeFromData(result).(*array.Int32)
+               defer resultArr.Release()
+               assert.False(t, resultArr.IsValid(1))
+               assert.False(t, resultArr.IsValid(4))
+               assert.Equal(t, int32(1), resultArr.Value(0))
+               assert.Equal(t, int32(3), resultArr.Value(2))
+       })
+
+       t.Run("multiple chunks string type", func(t *testing.T) {
+               bldr := array.NewStringBuilder(mem)
+               defer bldr.Release()
+
+               bldr.AppendValues([]string{"hello", "world"}, nil)
+               chunk1 := bldr.NewStringArray()
+               defer chunk1.Release()
+
+               bldr.AppendValues([]string{"arrow", "parquet"}, nil)
+               chunk2 := bldr.NewStringArray()
+               defer chunk2.Release()
+
+               chunked := arrow.NewChunked(arrow.BinaryTypes.String, 
[]arrow.Array{chunk1, chunk2})
+               defer chunked.Release()
+
+               result, err := chunksToSingle(chunked, mem)
+               require.NoError(t, err)
+               defer result.Release()
+
+               assert.Equal(t, 4, result.Len())
+
+               resultArr := array.MakeFromData(result).(*array.String)
+               defer resultArr.Release()
+               assert.Equal(t, "hello", resultArr.Value(0))
+               assert.Equal(t, "parquet", resultArr.Value(3))
+       })
+}
+
+func TestChunkedTableRoundTrip(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "int64_col", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+                       {Name: "string_col", Type: arrow.BinaryTypes.String, 
Nullable: true},
+               },
+               nil,
+       )
+
+       // Test data across 3 chunks: 5 + 3 + 2 = 10 rows
+       allInt64Values := []int64{10, 20, 30, 40, 50, 60, 70, 80, 90, 100}
+       allStringValues := []string{"hello", "world", "arrow", "parquet", "go", 
"chunked", "table", "test", "final", "chunk"}
+
+       var buf bytes.Buffer
+       props := parquet.NewWriterProperties(
+               parquet.WithCompression(compress.Codecs.Snappy),
+               parquet.WithAllocator(mem),
+       )
+
+       writerProps := NewArrowWriterProperties(
+               WithAllocator(mem),
+       )
+
+       writer, err := NewFileWriter(schema, &buf, props, writerProps)
+       require.NoError(t, err)
+
+       // Write three chunks: 5 rows, 3 rows, 2 rows
+       chunks := []struct{ start, end int }{
+               {0, 5},  // First chunk: 5 rows
+               {5, 8},  // Second chunk: 3 rows
+               {8, 10}, // Third chunk: 2 rows
+       }
+
+       for _, chunk := range chunks {
+               int64Builder := array.NewInt64Builder(mem)
+               
int64Builder.AppendValues(allInt64Values[chunk.start:chunk.end], nil)
+               int64Arr := int64Builder.NewInt64Array()
+               int64Builder.Release()
+
+               stringBuilder := array.NewStringBuilder(mem)
+               
stringBuilder.AppendValues(allStringValues[chunk.start:chunk.end], nil)
+               stringArr := stringBuilder.NewStringArray()
+               stringBuilder.Release()
+
+               rec := array.NewRecordBatch(schema, []arrow.Array{int64Arr, 
stringArr}, int64(chunk.end-chunk.start))
+
+               err = writer.Write(rec)
+               require.NoError(t, err)
+
+               rec.Release()
+               int64Arr.Release()
+               stringArr.Release()
+       }
+
+       err = writer.Close()
+       require.NoError(t, err)
+
+       // Read back from parquet
+       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()),
+               file.WithReadProps(parquet.NewReaderProperties(mem)))
+       require.NoError(t, err)
+       defer pf.Close()
+
+       reader, err := NewFileReader(pf, ArrowReadProperties{}, mem)
+       require.NoError(t, err)
+
+       rr, err := reader.GetRecordReader(context.Background(), nil, nil)
+       require.NoError(t, err)
+       defer rr.Release()
+
+       var records []arrow.RecordBatch
+       for {
+               rec, err := rr.Read()
+               if err == io.EOF {
+                       break
+               }
+               require.NoError(t, err)
+               rec.Retain()
+               records = append(records, rec)
+       }
+
+       readTable := array.NewTableFromRecords(schema, records)
+       defer readTable.Release()
+
+       for _, rec := range records {
+               rec.Release()
+       }
+
+       // Verify the read table
+       require.Equal(t, int64(10), readTable.NumRows())
+       require.Equal(t, int64(2), readTable.NumCols())
+
+       // Verify int64 column values
+       int64Col := readTable.Column(0).Data()
+       int64Single, err := chunksToSingle(int64Col, mem)
+       require.NoError(t, err)
+       defer int64Single.Release()
+       int64Arr := array.MakeFromData(int64Single).(*array.Int64)
+       defer int64Arr.Release()
+       for i := 0; i < int64Arr.Len(); i++ {
+               assert.Equal(t, allInt64Values[i], int64Arr.Value(i))
+       }
+
+       // Verify string column values
+       stringCol := readTable.Column(1).Data()
+       stringSingle, err := chunksToSingle(stringCol, mem)
+       require.NoError(t, err)
+       defer stringSingle.Release()
+       stringArr := array.MakeFromData(stringSingle).(*array.String)
+       defer stringArr.Release()
+       for i := 0; i < stringArr.Len(); i++ {
+               assert.Equal(t, allStringValues[i], stringArr.Value(i))
+       }
+}
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index c85b2fc9..f59a589e 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -527,6 +527,7 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, 
colIndices, rowGroups
                parallel:     fr.Props.Parallel,
                sc:           sc,
                fieldReaders: readers,
+               mem:          fr.mem,
        }
        rr.refCount.Add(1)
        return rr, nil
@@ -721,6 +722,7 @@ type recordReader struct {
        fieldReaders []*ColumnReader
        cur          arrow.RecordBatch
        err          error
+       mem          memory.Allocator
 
        refCount atomic.Int64
 }
@@ -789,7 +791,7 @@ func (r *recordReader) next() bool {
                        return io.EOF
                }
 
-               arrdata, err := chunksToSingle(data)
+               arrdata, err := chunksToSingle(data, r.mem)
                if err != nil {
                        return err
                }

Reply via email to