This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3a01d9fc feat: support conversion of chunked arrays (#553)
3a01d9fc is described below
commit 3a01d9fc298db6c4cf4641704dfce8efaf8412cf
Author: Ahmed Mezghani <[email protected]>
AuthorDate: Wed Oct 29 17:00:55 2025 +0100
feat: support conversion of chunked arrays (#553)
### Rationale for this change
Fixes https://github.com/apache/arrow-go/issues/552
### What changes are included in this PR?
Make `chunksToSingle` concatenate multiple chunks into a single array
using `array.Concatenate` instead of returning `ErrNotImplemented`
### Are these changes tested?
Unit tests + tested end-to-end locally on a parquet file that contains
chunked arrays
### Are there any user-facing changes?
Changes are backward compatible
---
parquet/pqarrow/column_readers.go | 21 ++-
parquet/pqarrow/column_readers_test.go | 275 +++++++++++++++++++++++++++++++++
parquet/pqarrow/file_reader.go | 4 +-
3 files changed, 292 insertions(+), 8 deletions(-)
diff --git a/parquet/pqarrow/column_readers.go
b/parquet/pqarrow/column_readers.go
index 3ea81a71..1be8a48e 100644
--- a/parquet/pqarrow/column_readers.go
+++ b/parquet/pqarrow/column_readers.go
@@ -319,7 +319,7 @@ func (sr *structReader) BuildArray(lenBound int64)
(*arrow.Chunked, error) {
return nil, err
}
- childArrData[i], err = chunksToSingle(field)
+ childArrData[i], err = chunksToSingle(field, sr.rctx.mem)
field.Release() // release field before checking
if err != nil {
return nil, err
@@ -442,7 +442,7 @@ func (lr *listReader) BuildArray(lenBound int64)
(*arrow.Chunked, error) {
validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read)))
}
- item, err := chunksToSingle(arr)
+ item, err := chunksToSingle(arr, lr.rctx.mem)
if err != nil {
return nil, err
}
@@ -489,9 +489,7 @@ func newFixedSizeListReader(rctx *readerCtx, field
*arrow.Field, info file.Level
}
// helper function to combine chunks into a single array.
-//
-// nested data conversion for chunked array outputs not yet implemented
-func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
+func chunksToSingle(chunked *arrow.Chunked, mem memory.Allocator)
(arrow.ArrayData, error) {
switch len(chunked.Chunks()) {
case 0:
return array.NewData(chunked.DataType(), 0,
[]*memory.Buffer{nil, nil}, nil, 0, 0), nil
@@ -499,8 +497,17 @@ func chunksToSingle(chunked *arrow.Chunked)
(arrow.ArrayData, error) {
data := chunked.Chunk(0).Data()
data.Retain() // we pass control to the caller
return data, nil
- default: // if an item reader yields a chunked array, this is not yet
implemented
- return nil, arrow.ErrNotImplemented
+ default:
+ // concatenate multiple chunks into a single array
+ concatenated, err := array.Concatenate(chunked.Chunks(), mem)
+ if err != nil {
+ return nil, err
+ }
+ defer concatenated.Release()
+
+ data := concatenated.Data()
+ data.Retain()
+ return data, nil
}
}
diff --git a/parquet/pqarrow/column_readers_test.go
b/parquet/pqarrow/column_readers_test.go
new file mode 100644
index 00000000..6a86b3f9
--- /dev/null
+++ b/parquet/pqarrow/column_readers_test.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package pqarrow
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/compress"
+ "github.com/apache/arrow-go/v18/parquet/file"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestChunksToSingle(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ t.Run("empty chunked array", func(t *testing.T) {
+ chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32,
[]arrow.Array{})
+ defer chunked.Release()
+
+ result, err := chunksToSingle(chunked, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ assert.Equal(t, 0, result.Len())
+ })
+
+ t.Run("single chunk", func(t *testing.T) {
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+ bldr.AppendValues([]int32{1, 2, 3, 4, 5}, nil)
+ arr := bldr.NewInt32Array()
+ defer arr.Release()
+
+ chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32,
[]arrow.Array{arr})
+ defer chunked.Release()
+
+ result, err := chunksToSingle(chunked, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ assert.Equal(t, 5, result.Len())
+ })
+
+ t.Run("multiple chunks", func(t *testing.T) {
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+
+ bldr.AppendValues([]int32{1, 2, 3}, nil)
+ chunk1 := bldr.NewInt32Array()
+ defer chunk1.Release()
+
+ bldr.AppendValues([]int32{4, 5, 6}, nil)
+ chunk2 := bldr.NewInt32Array()
+ defer chunk2.Release()
+
+ bldr.AppendValues([]int32{7, 8, 9, 10}, nil)
+ chunk3 := bldr.NewInt32Array()
+ defer chunk3.Release()
+
+ chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32,
[]arrow.Array{chunk1, chunk2, chunk3})
+ defer chunked.Release()
+
+ result, err := chunksToSingle(chunked, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ assert.Equal(t, 10, result.Len())
+
+ // Verify concatenated values
+ resultArr := array.MakeFromData(result).(*array.Int32)
+ defer resultArr.Release()
+ for i, expected := range []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
{
+ assert.Equal(t, expected, resultArr.Value(i))
+ }
+ })
+
+ t.Run("multiple chunks with nulls", func(t *testing.T) {
+ bldr := array.NewInt32Builder(mem)
+ defer bldr.Release()
+
+ bldr.AppendValues([]int32{1, 0, 3}, []bool{true, false, true})
+ chunk1 := bldr.NewInt32Array()
+ defer chunk1.Release()
+
+ bldr.AppendValues([]int32{4, 0, 6}, []bool{true, false, true})
+ chunk2 := bldr.NewInt32Array()
+ defer chunk2.Release()
+
+ chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32,
[]arrow.Array{chunk1, chunk2})
+ defer chunked.Release()
+
+ result, err := chunksToSingle(chunked, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ assert.Equal(t, 6, result.Len())
+ assert.Equal(t, 2, result.NullN())
+
+ resultArr := array.MakeFromData(result).(*array.Int32)
+ defer resultArr.Release()
+ assert.False(t, resultArr.IsValid(1))
+ assert.False(t, resultArr.IsValid(4))
+ assert.Equal(t, int32(1), resultArr.Value(0))
+ assert.Equal(t, int32(3), resultArr.Value(2))
+ })
+
+ t.Run("multiple chunks string type", func(t *testing.T) {
+ bldr := array.NewStringBuilder(mem)
+ defer bldr.Release()
+
+ bldr.AppendValues([]string{"hello", "world"}, nil)
+ chunk1 := bldr.NewStringArray()
+ defer chunk1.Release()
+
+ bldr.AppendValues([]string{"arrow", "parquet"}, nil)
+ chunk2 := bldr.NewStringArray()
+ defer chunk2.Release()
+
+ chunked := arrow.NewChunked(arrow.BinaryTypes.String,
[]arrow.Array{chunk1, chunk2})
+ defer chunked.Release()
+
+ result, err := chunksToSingle(chunked, mem)
+ require.NoError(t, err)
+ defer result.Release()
+
+ assert.Equal(t, 4, result.Len())
+
+ resultArr := array.MakeFromData(result).(*array.String)
+ defer resultArr.Release()
+ assert.Equal(t, "hello", resultArr.Value(0))
+ assert.Equal(t, "parquet", resultArr.Value(3))
+ })
+}
+
+func TestChunkedTableRoundTrip(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ schema := arrow.NewSchema(
+ []arrow.Field{
+ {Name: "int64_col", Type: arrow.PrimitiveTypes.Int64,
Nullable: false},
+ {Name: "string_col", Type: arrow.BinaryTypes.String,
Nullable: true},
+ },
+ nil,
+ )
+
+ // Test data across 3 chunks: 5 + 3 + 2 = 10 rows
+ allInt64Values := []int64{10, 20, 30, 40, 50, 60, 70, 80, 90, 100}
+ allStringValues := []string{"hello", "world", "arrow", "parquet", "go",
"chunked", "table", "test", "final", "chunk"}
+
+ var buf bytes.Buffer
+ props := parquet.NewWriterProperties(
+ parquet.WithCompression(compress.Codecs.Snappy),
+ parquet.WithAllocator(mem),
+ )
+
+ writerProps := NewArrowWriterProperties(
+ WithAllocator(mem),
+ )
+
+ writer, err := NewFileWriter(schema, &buf, props, writerProps)
+ require.NoError(t, err)
+
+ // Write three chunks: 5 rows, 3 rows, 2 rows
+ chunks := []struct{ start, end int }{
+ {0, 5}, // First chunk: 5 rows
+ {5, 8}, // Second chunk: 3 rows
+ {8, 10}, // Third chunk: 2 rows
+ }
+
+ for _, chunk := range chunks {
+ int64Builder := array.NewInt64Builder(mem)
+
int64Builder.AppendValues(allInt64Values[chunk.start:chunk.end], nil)
+ int64Arr := int64Builder.NewInt64Array()
+ int64Builder.Release()
+
+ stringBuilder := array.NewStringBuilder(mem)
+
stringBuilder.AppendValues(allStringValues[chunk.start:chunk.end], nil)
+ stringArr := stringBuilder.NewStringArray()
+ stringBuilder.Release()
+
+ rec := array.NewRecordBatch(schema, []arrow.Array{int64Arr,
stringArr}, int64(chunk.end-chunk.start))
+
+ err = writer.Write(rec)
+ require.NoError(t, err)
+
+ rec.Release()
+ int64Arr.Release()
+ stringArr.Release()
+ }
+
+ err = writer.Close()
+ require.NoError(t, err)
+
+ // Read back from parquet
+ pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()),
+ file.WithReadProps(parquet.NewReaderProperties(mem)))
+ require.NoError(t, err)
+ defer pf.Close()
+
+ reader, err := NewFileReader(pf, ArrowReadProperties{}, mem)
+ require.NoError(t, err)
+
+ rr, err := reader.GetRecordReader(context.Background(), nil, nil)
+ require.NoError(t, err)
+ defer rr.Release()
+
+ var records []arrow.RecordBatch
+ for {
+ rec, err := rr.Read()
+ if err == io.EOF {
+ break
+ }
+ require.NoError(t, err)
+ rec.Retain()
+ records = append(records, rec)
+ }
+
+ readTable := array.NewTableFromRecords(schema, records)
+ defer readTable.Release()
+
+ for _, rec := range records {
+ rec.Release()
+ }
+
+ // Verify the read table
+ require.Equal(t, int64(10), readTable.NumRows())
+ require.Equal(t, int64(2), readTable.NumCols())
+
+ // Verify int64 column values
+ int64Col := readTable.Column(0).Data()
+ int64Single, err := chunksToSingle(int64Col, mem)
+ require.NoError(t, err)
+ defer int64Single.Release()
+ int64Arr := array.MakeFromData(int64Single).(*array.Int64)
+ defer int64Arr.Release()
+ for i := 0; i < int64Arr.Len(); i++ {
+ assert.Equal(t, allInt64Values[i], int64Arr.Value(i))
+ }
+
+ // Verify string column values
+ stringCol := readTable.Column(1).Data()
+ stringSingle, err := chunksToSingle(stringCol, mem)
+ require.NoError(t, err)
+ defer stringSingle.Release()
+ stringArr := array.MakeFromData(stringSingle).(*array.String)
+ defer stringArr.Release()
+ for i := 0; i < stringArr.Len(); i++ {
+ assert.Equal(t, allStringValues[i], stringArr.Value(i))
+ }
+}
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index c85b2fc9..f59a589e 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -527,6 +527,7 @@ func (fr *FileReader) GetRecordReader(ctx context.Context,
colIndices, rowGroups
parallel: fr.Props.Parallel,
sc: sc,
fieldReaders: readers,
+ mem: fr.mem,
}
rr.refCount.Add(1)
return rr, nil
@@ -721,6 +722,7 @@ type recordReader struct {
fieldReaders []*ColumnReader
cur arrow.RecordBatch
err error
+ mem memory.Allocator
refCount atomic.Int64
}
@@ -789,7 +791,7 @@ func (r *recordReader) next() bool {
return io.EOF
}
- arrdata, err := chunksToSingle(data)
+ arrdata, err := chunksToSingle(data, r.mem)
if err != nil {
return err
}