This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new c2904b0bf8 GH-34330: [Go][Parquet]: Add Extension type support (#34631)
c2904b0bf8 is described below
commit c2904b0bf8ca142dacd44bced8510b5bea540538
Author: Yevgeny Pats <[email protected]>
AuthorDate: Wed Apr 12 15:59:36 2023 -0400
GH-34330: [Go][Parquet]: Add Extension type support (#34631)
Follow-up instead of https://github.com/apache/arrow/pull/34356
* Closes: #34330
This is not yet complete but I would love some direction on where should I
add tests and in what other places I should handle the new extension type.
Lead-authored-by: Kemal Hadimli <[email protected]>
Co-authored-by: Herman Schaaf <[email protected]>
Co-authored-by: Yevgeny Pats <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/array/array_test.go | 2 +-
go/arrow/array/dictionary_test.go | 2 +-
go/arrow/array/diff_test.go | 2 +-
go/arrow/array/extension_test.go | 2 +-
go/arrow/array/table_test.go | 15 +-
go/arrow/compute/cast_test.go | 2 +-
go/arrow/compute/internal/exec/span_test.go | 2 +-
go/arrow/compute/vector_selection_test.go | 2 +-
go/arrow/csv/reader_test.go | 15 +-
go/arrow/csv/writer_test.go | 3 +-
go/arrow/datatype_extension_test.go | 2 +-
go/arrow/internal/arrdata/arrdata.go | 2 +-
go/arrow/internal/flight_integration/scenario.go | 2 +-
.../ipc/cmd/arrow-json-integration-test/main.go | 2 +-
go/arrow/ipc/endian_swap_test.go | 2 +-
go/arrow/ipc/metadata_test.go | 2 +-
go/arrow/table.go | 34 +-
.../testing => internal}/types/extension_types.go | 8 +-
.../types/extension_types_test.go} | 2 +-
go/parquet/internal/encoding/types.go | 6 +
go/parquet/internal/testutils/random_arrow.go | 38 +-
go/parquet/pqarrow/column_readers.go | 42 +-
go/parquet/pqarrow/encode_arrow.go | 60 +--
go/parquet/pqarrow/encode_arrow_test.go | 429 ++++++++++++++++-----
go/parquet/pqarrow/encode_dictionary_test.go | 4 +-
go/parquet/pqarrow/file_reader.go | 8 +-
go/parquet/pqarrow/file_writer.go | 2 +-
go/parquet/pqarrow/path_builder.go | 4 +-
go/parquet/pqarrow/path_builder_test.go | 48 +++
go/parquet/pqarrow/schema.go | 34 +-
go/parquet/pqarrow/schema_test.go | 43 +++
testing | 2 +-
32 files changed, 599 insertions(+), 224 deletions(-)
diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index 74eb0c4839..ab1b8831c8 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -22,8 +22,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/internal/testing/tools"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
)
diff --git a/go/arrow/array/dictionary_test.go
b/go/arrow/array/dictionary_test.go
index 5c999df315..667bf5b24a 100644
--- a/go/arrow/array/dictionary_test.go
+++ b/go/arrow/array/dictionary_test.go
@@ -27,8 +27,8 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/bitutil"
"github.com/apache/arrow/go/v12/arrow/decimal128"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
diff --git a/go/arrow/array/diff_test.go b/go/arrow/array/diff_test.go
index e2c7820d5f..55f496b4db 100644
--- a/go/arrow/array/diff_test.go
+++ b/go/arrow/array/diff_test.go
@@ -26,8 +26,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
)
type diffTestCase struct {
diff --git a/go/arrow/array/extension_test.go b/go/arrow/array/extension_test.go
index 121bf22358..05f3fc26b6 100644
--- a/go/arrow/array/extension_test.go
+++ b/go/arrow/array/extension_test.go
@@ -21,8 +21,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/suite"
)
diff --git a/go/arrow/array/table_test.go b/go/arrow/array/table_test.go
index 62437104d6..67cab2066e 100644
--- a/go/arrow/array/table_test.go
+++ b/go/arrow/array/table_test.go
@@ -17,6 +17,7 @@
package array_test
import (
+ "errors"
"fmt"
"reflect"
"testing"
@@ -157,7 +158,17 @@ func TestChunkedInvalid(t *testing.T) {
if e == nil {
t.Fatalf("expected a panic")
}
- if got, want := e.(string), "arrow/array: mismatch data type";
got != want {
+
+ err, ok := e.(error)
+ if !ok {
+ t.Fatalf("expected an error")
+ }
+
+ if !errors.Is(err, arrow.ErrInvalid) {
+ t.Fatalf("should be an ErrInvalid")
+ }
+
+ if got, want := err.Error(), fmt.Sprintf("%s: arrow/array:
mismatch data type float64 vs int32", arrow.ErrInvalid); got != want {
t.Fatalf("invalid error. got=%q, want=%q", got, want)
}
}()
@@ -313,7 +324,7 @@ func TestColumn(t *testing.T) {
return c
}(),
field: arrow.Field{Name: "f32", Type:
arrow.PrimitiveTypes.Float32},
- err: fmt.Errorf("arrow/array: inconsistent data
type"),
+ err: fmt.Errorf("%w: arrow/array: inconsistent data
type float64 vs float32", arrow.ErrInvalid),
},
} {
t.Run("", func(t *testing.T) {
diff --git a/go/arrow/compute/cast_test.go b/go/arrow/compute/cast_test.go
index 5c9c7f9cd9..921143bee4 100644
--- a/go/arrow/compute/cast_test.go
+++ b/go/arrow/compute/cast_test.go
@@ -33,9 +33,9 @@ import (
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/decimal256"
"github.com/apache/arrow/go/v12/arrow/internal/testing/gen"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/arrow/scalar"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
diff --git a/go/arrow/compute/internal/exec/span_test.go
b/go/arrow/compute/internal/exec/span_test.go
index 52409daadb..c62113e48c 100644
--- a/go/arrow/compute/internal/exec/span_test.go
+++ b/go/arrow/compute/internal/exec/span_test.go
@@ -29,9 +29,9 @@ import (
"github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/endian"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/arrow/scalar"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
)
diff --git a/go/arrow/compute/vector_selection_test.go
b/go/arrow/compute/vector_selection_test.go
index b328f5fdf4..209e9b8d59 100644
--- a/go/arrow/compute/vector_selection_test.go
+++ b/go/arrow/compute/vector_selection_test.go
@@ -30,9 +30,9 @@ import (
"github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
"github.com/apache/arrow/go/v12/arrow/compute/internal/kernels"
"github.com/apache/arrow/go/v12/arrow/internal/testing/gen"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/arrow/scalar"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
diff --git a/go/arrow/csv/reader_test.go b/go/arrow/csv/reader_test.go
index c5bd90d76e..560354fb64 100644
--- a/go/arrow/csv/reader_test.go
+++ b/go/arrow/csv/reader_test.go
@@ -31,8 +31,8 @@ import (
"github.com/apache/arrow/go/v12/arrow/csv"
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/decimal256"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -165,10 +165,10 @@ func Example_withChunk() {
}
func TestCSVReadInvalidFields(t *testing.T) {
- tests := []struct {
- Name string
- Data string
- Fields []arrow.Field
+ tests := []struct {
+ Name string
+ Data string
+ Fields []arrow.Field
ExpectedError bool
}{
{
@@ -201,13 +201,14 @@ func TestCSVReadInvalidFields(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
f := bytes.NewBufferString(tc.Data)
schema := arrow.NewSchema(tc.Fields, nil)
-
+
r := csv.NewReader(
f, schema,
csv.WithComma(','),
)
defer r.Release()
- for r.Next() {}
+ for r.Next() {
+ }
parseErr := r.Err()
if tc.ExpectedError && parseErr == nil {
t.Fatal("Expected error, but none found")
diff --git a/go/arrow/csv/writer_test.go b/go/arrow/csv/writer_test.go
index bfe0bcddf9..9f1b524ce7 100644
--- a/go/arrow/csv/writer_test.go
+++ b/go/arrow/csv/writer_test.go
@@ -30,8 +30,8 @@ import (
"github.com/apache/arrow/go/v12/arrow/csv"
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/decimal256"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/google/uuid"
)
@@ -257,7 +257,6 @@ func testCSVWriter(t *testing.T, data [][]string,
writeHeader bool, fmtr func(bo
listBuilderInt64.AppendValues([]int64{7, 8, 9}, nil)
b.Field(18).(*array.BinaryBuilder).AppendValues([][]byte{{0, 1, 2}, {3,
4, 5}, {}}, nil)
b.Field(19).(*types.UUIDBuilder).AppendValues([]uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001"),
uuid.MustParse("00000000-0000-0000-0000-000000000002"),
uuid.MustParse("00000000-0000-0000-0000-000000000003")}, nil)
-
for _, field := range b.Fields() {
field.AppendNull()
diff --git a/go/arrow/datatype_extension_test.go
b/go/arrow/datatype_extension_test.go
index c9a62aa5d8..0458437b09 100644
--- a/go/arrow/datatype_extension_test.go
+++ b/go/arrow/datatype_extension_test.go
@@ -21,7 +21,7 @@ import (
"testing"
"github.com/apache/arrow/go/v12/arrow"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
diff --git a/go/arrow/internal/arrdata/arrdata.go
b/go/arrow/internal/arrdata/arrdata.go
index aa05939356..ae4f917ca1 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -25,9 +25,9 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/decimal128"
"github.com/apache/arrow/go/v12/arrow/float16"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
)
var (
diff --git a/go/arrow/internal/flight_integration/scenario.go
b/go/arrow/internal/flight_integration/scenario.go
index f2d2d693d9..b8b214ac7d 100644
--- a/go/arrow/internal/flight_integration/scenario.go
+++ b/go/arrow/internal/flight_integration/scenario.go
@@ -34,9 +34,9 @@ import (
"github.com/apache/arrow/go/v12/arrow/flight/flightsql"
"github.com/apache/arrow/go/v12/arrow/flight/flightsql/schema_ref"
"github.com/apache/arrow/go/v12/arrow/internal/arrjson"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"golang.org/x/xerrors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
diff --git a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
index ad78b61f2e..c7e5e3c14b 100644
--- a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
@@ -26,8 +26,8 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/arrio"
"github.com/apache/arrow/go/v12/arrow/internal/arrjson"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/ipc"
+ "github.com/apache/arrow/go/v12/internal/types"
)
func main() {
diff --git a/go/arrow/ipc/endian_swap_test.go b/go/arrow/ipc/endian_swap_test.go
index 146f7b08e1..73561e42e8 100644
--- a/go/arrow/ipc/endian_swap_test.go
+++ b/go/arrow/ipc/endian_swap_test.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/endian"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
diff --git a/go/arrow/ipc/metadata_test.go b/go/arrow/ipc/metadata_test.go
index 805b1c5e1e..dc24164c28 100644
--- a/go/arrow/ipc/metadata_test.go
+++ b/go/arrow/ipc/metadata_test.go
@@ -25,8 +25,8 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/internal/dictutils"
"github.com/apache/arrow/go/v12/arrow/internal/flatbuf"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/assert"
)
diff --git a/go/arrow/table.go b/go/arrow/table.go
index 467102c00c..0d20d955ab 100644
--- a/go/arrow/table.go
+++ b/go/arrow/table.go
@@ -17,6 +17,7 @@
package arrow
import (
+ "fmt"
"sync/atomic"
"github.com/apache/arrow/go/v12/arrow/internal/debug"
@@ -42,20 +43,19 @@ type Table interface {
// To get strongly typed data from a Column, you need to iterate the
// chunks and type assert each individual Array. For example:
//
-// switch column.DataType().ID {
-// case arrow.INT32:
-// for _, c := range column.Data().Chunks() {
-// arr := c.(*array.Int32)
-// // do something with arr
-// }
-// case arrow.INT64:
-// for _, c := range column.Data().Chunks() {
-// arr := c.(*array.Int64)
-// // do something with arr
-// }
-// case ...
+// switch column.DataType().ID {
+// case arrow.INT32:
+// for _, c := range column.Data().Chunks() {
+// arr := c.(*array.Int32)
+// // do something with arr
// }
-//
+// case arrow.INT64:
+// for _, c := range column.Data().Chunks() {
+// arr := c.(*array.Int64)
+// // do something with arr
+// }
+// case ...
+// }
type Column struct {
field Field
data *Chunked
@@ -69,7 +69,7 @@ type Column struct {
// of the ref counting.
func NewColumnFromArr(field Field, arr Array) Column {
if !TypeEqual(field.Type, arr.DataType()) {
- panic("arrow/array: inconsistent data type")
+ panic(fmt.Errorf("%w: arrow/array: inconsistent data type %s vs
%s", ErrInvalid, field.Type, arr.DataType()))
}
arr.Retain()
@@ -98,7 +98,7 @@ func NewColumn(field Field, chunks *Chunked) *Column {
if !TypeEqual(col.data.DataType(), col.field.Type) {
col.data.Release()
- panic("arrow/array: inconsistent data type")
+ panic(fmt.Errorf("%w: arrow/array: inconsistent data type %s vs
%s", ErrInvalid, col.data.DataType(), col.field.Type))
}
return &col
@@ -148,9 +148,9 @@ func NewChunked(dtype DataType, chunks []Array) *Chunked {
if chunk == nil {
continue
}
-
+
if !TypeEqual(chunk.DataType(), dtype) {
- panic("arrow/array: mismatch data type")
+ panic(fmt.Errorf("%w: arrow/array: mismatch data type
%s vs %s", ErrInvalid, chunk.DataType().String(), dtype.String()))
}
chunk.Retain()
arr.chunks = append(arr.chunks, chunk)
diff --git a/go/arrow/internal/testing/types/extension_types.go
b/go/internal/types/extension_types.go
similarity index 98%
rename from go/arrow/internal/testing/types/extension_types.go
rename to go/internal/types/extension_types.go
index 80d8111b4e..bb087a7d34 100644
--- a/go/arrow/internal/testing/types/extension_types.go
+++ b/go/internal/types/extension_types.go
@@ -209,13 +209,13 @@ func (UUIDType) ExtensionName() string { return "uuid" }
func (UUIDType) Serialize() string { return "uuid-serialized" }
// Deserialize expects storageType to be FixedSizeBinaryType{ByteWidth: 16}
and the data to be
-// "uuid-serialized" in order to correctly create a UuidType for testing
deserialize.
+// "uuid-serialized" in order to correctly create a UUIDType for testing
deserialize.
func (UUIDType) Deserialize(storageType arrow.DataType, data string)
(arrow.ExtensionType, error) {
if string(data) != "uuid-serialized" {
return nil, fmt.Errorf("type identifier did not match: '%s'",
string(data))
}
if !arrow.TypeEqual(storageType, &arrow.FixedSizeBinaryType{ByteWidth:
16}) {
- return nil, fmt.Errorf("invalid storage type for UuidType: %s",
storageType.Name())
+ return nil, fmt.Errorf("invalid storage type for UUIDType: %s",
storageType.Name())
}
return NewUUIDType(), nil
}
@@ -258,7 +258,6 @@ func (a Parametric2Array) ValueString(i int) string {
return fmt.Sprintf("%d", arr.Value(i))
}
-
// A type where ExtensionName is always the same
type Parametric1Type struct {
arrow.ExtensionBase
@@ -266,7 +265,6 @@ type Parametric1Type struct {
param int32
}
-
func NewParametric1Type(p int32) *Parametric1Type {
ret := &Parametric1Type{param: p}
ret.ExtensionBase.Storage = arrow.PrimitiveTypes.Int32
@@ -516,13 +514,11 @@ func (SmallintType) Deserialize(storageType
arrow.DataType, data string) (arrow.
}
var (
- _ arrow.ExtensionType = (*UUIDType)(nil)
_ arrow.ExtensionType = (*Parametric1Type)(nil)
_ arrow.ExtensionType = (*Parametric2Type)(nil)
_ arrow.ExtensionType = (*ExtStructType)(nil)
_ arrow.ExtensionType = (*DictExtensionType)(nil)
_ arrow.ExtensionType = (*SmallintType)(nil)
- _ array.ExtensionArray = (*UUIDArray)(nil)
_ array.ExtensionArray = (*Parametric1Array)(nil)
_ array.ExtensionArray = (*Parametric2Array)(nil)
_ array.ExtensionArray = (*ExtStructArray)(nil)
diff --git a/go/arrow/internal/testing/types/extension_test.go
b/go/internal/types/extension_types_test.go
similarity index 97%
rename from go/arrow/internal/testing/types/extension_test.go
rename to go/internal/types/extension_types_test.go
index a0c07cdda3..39372bdf7d 100644
--- a/go/arrow/internal/testing/types/extension_test.go
+++ b/go/internal/types/extension_types_test.go
@@ -23,8 +23,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
- "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
diff --git a/go/parquet/internal/encoding/types.go
b/go/parquet/internal/encoding/types.go
index 83da564bdf..a53c98e979 100644
--- a/go/parquet/internal/encoding/types.go
+++ b/go/parquet/internal/encoding/types.go
@@ -339,6 +339,12 @@ func (b *BufferWriter) Finish() *memory.Buffer {
return buf
}
+// Release the underlying buffer and not allocate anything else. To re-use
this buffer, Reset() or Finish() should be called
+func (b *BufferWriter) Release() {
+ b.buffer.Release()
+ b.buffer = nil
+}
+
func (b *BufferWriter) Truncate() {
b.pos = 0
b.offset = 0
diff --git a/go/parquet/internal/testutils/random_arrow.go
b/go/parquet/internal/testutils/random_arrow.go
index 719a9f5dd0..71be6c5d63 100644
--- a/go/parquet/internal/testutils/random_arrow.go
+++ b/go/parquet/internal/testutils/random_arrow.go
@@ -33,80 +33,80 @@ import (
// binary will have each value between length 2 and 12 but random bytes that
are not limited to ascii
// fixed size binary will all be of length 10, random bytes are not limited to
ascii
// bool will be approximately half false and half true randomly.
-func RandomNonNull(dt arrow.DataType, size int) arrow.Array {
+func RandomNonNull(mem memory.Allocator, dt arrow.DataType, size int)
arrow.Array {
switch dt.ID() {
case arrow.FLOAT32:
- bldr := array.NewFloat32Builder(memory.DefaultAllocator)
+ bldr := array.NewFloat32Builder(mem)
defer bldr.Release()
values := make([]float32, size)
FillRandomFloat32(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.FLOAT64:
- bldr := array.NewFloat64Builder(memory.DefaultAllocator)
+ bldr := array.NewFloat64Builder(mem)
defer bldr.Release()
values := make([]float64, size)
FillRandomFloat64(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.INT64:
- bldr := array.NewInt64Builder(memory.DefaultAllocator)
+ bldr := array.NewInt64Builder(mem)
defer bldr.Release()
values := make([]int64, size)
FillRandomInt64(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.UINT64:
- bldr := array.NewUint64Builder(memory.DefaultAllocator)
+ bldr := array.NewUint64Builder(mem)
defer bldr.Release()
values := make([]uint64, size)
FillRandomUint64(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.INT32:
- bldr := array.NewInt32Builder(memory.DefaultAllocator)
+ bldr := array.NewInt32Builder(mem)
defer bldr.Release()
values := make([]int32, size)
FillRandomInt32(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.UINT32:
- bldr := array.NewUint32Builder(memory.DefaultAllocator)
+ bldr := array.NewUint32Builder(mem)
defer bldr.Release()
values := make([]uint32, size)
FillRandomUint32(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.INT16:
- bldr := array.NewInt16Builder(memory.DefaultAllocator)
+ bldr := array.NewInt16Builder(mem)
defer bldr.Release()
values := make([]int16, size)
FillRandomInt16(0, 0, 64, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.UINT16:
- bldr := array.NewUint16Builder(memory.DefaultAllocator)
+ bldr := array.NewUint16Builder(mem)
defer bldr.Release()
values := make([]uint16, size)
FillRandomUint16(0, 0, 64, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.INT8:
- bldr := array.NewInt8Builder(memory.DefaultAllocator)
+ bldr := array.NewInt8Builder(mem)
defer bldr.Release()
values := make([]int8, size)
FillRandomInt8(0, 0, 64, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.UINT8:
- bldr := array.NewUint8Builder(memory.DefaultAllocator)
+ bldr := array.NewUint8Builder(mem)
defer bldr.Release()
values := make([]uint8, size)
FillRandomUint8(0, 0, 64, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
case arrow.DATE32:
- bldr := array.NewDate32Builder(memory.DefaultAllocator)
+ bldr := array.NewDate32Builder(mem)
defer bldr.Release()
values := make([]int32, size)
FillRandomInt32Max(0, 24, values)
@@ -118,7 +118,7 @@ func RandomNonNull(dt arrow.DataType, size int) arrow.Array
{
bldr.AppendValues(dates, nil)
return bldr.NewArray()
case arrow.DATE64:
- bldr := array.NewDate64Builder(memory.DefaultAllocator)
+ bldr := array.NewDate64Builder(mem)
defer bldr.Release()
values := make([]int64, size)
FillRandomInt64Max(0, 24, values)
@@ -130,21 +130,21 @@ func RandomNonNull(dt arrow.DataType, size int)
arrow.Array {
bldr.AppendValues(dates, nil)
return bldr.NewArray()
case arrow.STRING:
- bldr := array.NewStringBuilder(memory.DefaultAllocator)
+ bldr := array.NewStringBuilder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append("test-string")
}
return bldr.NewArray()
case arrow.LARGE_STRING:
- bldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+ bldr := array.NewLargeStringBuilder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append("test-large-string")
}
return bldr.NewArray()
case arrow.BINARY, arrow.LARGE_BINARY:
- bldr := array.NewBinaryBuilder(memory.DefaultAllocator,
dt.(arrow.BinaryDataType))
+ bldr := array.NewBinaryBuilder(mem, dt.(arrow.BinaryDataType))
defer bldr.Release()
buf := make([]byte, 12)
@@ -156,7 +156,7 @@ func RandomNonNull(dt arrow.DataType, size int) arrow.Array
{
}
return bldr.NewArray()
case arrow.FIXED_SIZE_BINARY:
- bldr :=
array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator,
&arrow.FixedSizeBinaryType{ByteWidth: 10})
+ bldr := array.NewFixedSizeBinaryBuilder(mem,
&arrow.FixedSizeBinaryType{ByteWidth: 10})
defer bldr.Release()
buf := make([]byte, 10)
@@ -168,14 +168,14 @@ func RandomNonNull(dt arrow.DataType, size int)
arrow.Array {
return bldr.NewArray()
case arrow.DECIMAL:
dectype := dt.(*arrow.Decimal128Type)
- bldr := array.NewDecimal128Builder(memory.DefaultAllocator,
dectype)
+ bldr := array.NewDecimal128Builder(mem, dectype)
defer bldr.Release()
data := RandomDecimals(int64(size), 0, dectype.Precision)
bldr.AppendValues(arrow.Decimal128Traits.CastFromBytes(data),
nil)
return bldr.NewArray()
case arrow.BOOL:
- bldr := array.NewBooleanBuilder(memory.DefaultAllocator)
+ bldr := array.NewBooleanBuilder(mem)
defer bldr.Release()
values := make([]bool, size)
diff --git a/go/parquet/pqarrow/column_readers.go
b/go/parquet/pqarrow/column_readers.go
index e0179f8340..1bcaa7adf7 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -18,6 +18,7 @@ package pqarrow
import (
"encoding/binary"
+ "errors"
"fmt"
"reflect"
"sync"
@@ -35,7 +36,6 @@ import (
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/schema"
"golang.org/x/sync/errgroup"
- "golang.org/x/xerrors"
)
// column reader for leaf columns (non-nested)
@@ -201,7 +201,7 @@ func (sr *structReader) IsOrHasRepeatedChild() bool {
return sr.hasRepeatedChild
func (sr *structReader) GetDefLevels() ([]int16, error) {
if len(sr.children) == 0 {
- return nil, xerrors.New("struct raeder has no children")
+ return nil, errors.New("struct reader has no children")
}
// this method should only be called when this struct or one of its
parents
@@ -212,7 +212,7 @@ func (sr *structReader) GetDefLevels() ([]int16, error) {
func (sr *structReader) GetRepLevels() ([]int16, error) {
if len(sr.children) == 0 {
- return nil, xerrors.New("struct raeder has no children")
+ return nil, errors.New("struct reader has no children")
}
// this method should only be called when this struct or one of its
parents
@@ -453,14 +453,19 @@ func chunksToSingle(chunked *arrow.Chunked)
(arrow.ArrayData, error) {
case 1:
return chunked.Chunk(0).Data(), nil
default: // if an item reader yields a chunked array, this is not yet
implemented
- return nil, xerrors.New("not implemented")
+ return nil, arrow.ErrNotImplemented
}
}
// create a chunked arrow array from the raw record data
func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr
*schema.Column, mem memory.Allocator) (*arrow.Chunked, error) {
+ dt := valueType
+ if valueType.ID() == arrow.EXTENSION {
+ dt = valueType.(arrow.ExtensionType).StorageType()
+ }
+
var data arrow.ArrayData
- switch valueType.ID() {
+ switch dt.ID() {
case arrow.DICTIONARY:
return transferDictionary(rdr, valueType), nil
case arrow.NULL:
@@ -490,7 +495,7 @@ func transferColumnData(rdr file.RecordReader, valueType
arrow.DataType, descr *
case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
return
transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType)
default:
- return nil, xerrors.New("physical type for decimal128
must be int32, int64, bytearray or fixed len byte array")
+ return nil, errors.New("physical type for decimal128
must be int32, int64, bytearray or fixed len byte array")
}
case arrow.TIMESTAMP:
tstype := valueType.(*arrow.TimestampType)
@@ -504,7 +509,7 @@ func transferColumnData(rdr file.RecordReader, valueType
arrow.DataType, descr *
data = transferZeroCopy(rdr, valueType)
}
default:
- return nil, xerrors.New("time unit not supported")
+ return nil, errors.New("time unit not supported")
}
default:
return nil, fmt.Errorf("no support for reading columns of type:
%s", valueType.Name())
@@ -538,12 +543,23 @@ func transferBinary(rdr file.RecordReader, dt
arrow.DataType) *arrow.Chunked {
return transferDictionary(brdr,
&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt})
}
chunks := brdr.GetBuilderChunks()
- if dt == arrow.BinaryTypes.String || dt ==
arrow.BinaryTypes.LargeString {
- // convert chunks from binary to string without copying data,
- // just changing the interpretation of the metadata
+ switch {
+ case dt.ID() == arrow.EXTENSION:
+ etype := dt.(arrow.ExtensionType)
+ for idx, chk := range chunks {
+ chunks[idx] = array.NewExtensionArrayWithStorage(etype,
chk)
+ chk.Release() // NewExtensionArrayWithStorage will call
retain on chk, so it still needs to be released
+ defer chunks[idx].Release()
+ }
+ case dt == arrow.BinaryTypes.String || dt ==
arrow.BinaryTypes.LargeString:
for idx := range chunks {
+ prev := chunks[idx]
chunks[idx] = array.MakeFromData(chunks[idx].Data())
- defer chunks[idx].Data().Release()
+ prev.Release()
+ defer chunks[idx].Release()
+ }
+ default:
+ for idx := range chunks {
defer chunks[idx].Release()
}
}
@@ -639,8 +655,10 @@ func transferBool(rdr file.RecordReader) arrow.ArrayData {
if bitmap != nil {
defer bitmap.Release()
}
+ bb := memory.NewBufferBytes(data)
+ defer bb.Release()
return array.NewData(&arrow.BooleanType{}, length, []*memory.Buffer{
- bitmap, memory.NewBufferBytes(data),
+ bitmap, bb,
}, nil, int(rdr.NullCount()), 0)
}
diff --git a/go/parquet/pqarrow/encode_arrow.go
b/go/parquet/pqarrow/encode_arrow.go
index a9b9e0b16e..a467686720 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -19,6 +19,7 @@ package pqarrow
import (
"context"
"encoding/binary"
+ "errors"
"fmt"
"time"
"unsafe"
@@ -31,13 +32,14 @@ import (
"github.com/apache/arrow/go/v12/internal/utils"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
- "golang.org/x/xerrors"
)
// get the count of the number of leaf arrays for the type
func calcLeafCount(dt arrow.DataType) int {
switch dt.ID() {
- case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION:
+ case arrow.EXTENSION:
+ return calcLeafCount(dt.(arrow.ExtensionType).StorageType())
+ case arrow.SPARSE_UNION, arrow.DENSE_UNION:
panic("arrow type not implemented")
case arrow.DICTIONARY:
return calcLeafCount(dt.(*arrow.DictionaryType).ValueType)
@@ -112,7 +114,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size
int64, manifest *Sch
}
if absPos >= int64(data.Len()) {
- return ArrowColumnWriter{}, xerrors.New("cannot write data at
offset past end of chunked array")
+ return ArrowColumnWriter{}, errors.New("cannot write data at
offset past end of chunked array")
}
leafCount := calcLeafCount(data.DataType())
@@ -188,7 +190,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context)
error {
defer res.Release()
if len(res.postListVisitedElems) != 1 {
- return xerrors.New("lists with non-zero length
null components are not supported")
+ return errors.New("lists with non-zero length
null components are not supported")
}
rng := res.postListVisitedElems[0]
values := array.NewSlice(res.leafArr, rng.start,
rng.end)
@@ -221,10 +223,18 @@ func WriteArrowToColumn(ctx context.Context, cw
file.ColumnChunkWriter, leafArr
cw.SetBitsBuffer(buf)
}
+ arrCtx := arrowCtxFromContext(ctx)
+ defer func() {
+ if arrCtx.dataBuffer != nil {
+ arrCtx.dataBuffer.Release()
+ arrCtx.dataBuffer = nil
+ }
+ }()
+
if leafArr.DataType().ID() == arrow.DICTIONARY {
- return writeDictionaryArrow(arrowCtxFromContext(ctx), cw,
leafArr, defLevels, repLevels, maybeParentNulls)
+ return writeDictionaryArrow(arrCtx, cw, leafArr, defLevels,
repLevels, maybeParentNulls)
}
- return writeDenseArrow(arrowCtxFromContext(ctx), cw, leafArr,
defLevels, repLevels, maybeParentNulls)
+ return writeDenseArrow(arrCtx, cw, leafArr, defLevels, repLevels,
maybeParentNulls)
}
type binaryarr interface {
@@ -236,6 +246,12 @@ type binary64arr interface {
}
func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter,
leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err
error) {
+ if leafArr.DataType().ID() == arrow.EXTENSION {
+ extensionArray := leafArr.(array.ExtensionArray)
+ // Replace leafArr with its underlying storage array
+ leafArr = extensionArray.Storage()
+ }
+
noNulls := cw.Descr().SchemaNode().RepetitionType() ==
parquet.Repetitions.Required || leafArr.NullN() == 0
if ctx.dataBuffer == nil {
@@ -250,7 +266,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
// TODO(mtopol): optimize this so that we aren't converting from
// the bitmap -> []bool -> bitmap anymore
if leafArr.Len() == 0 {
- wr.WriteBatch(nil, defLevels, repLevels)
+ _, err = wr.WriteBatch(nil, defLevels, repLevels)
break
}
@@ -321,7 +337,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls,
int64(leafArr.Data().Offset()))
@@ -346,7 +362,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
} else if (cw.Properties().Version() == parquet.V1_0 ||
cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
// absent superceding user instructions, when
writing a Parquet Version <=2.4 File,
- // timestamps in nano seconds are coerced to
microseconds
+ // timestamps in nanoseconds are coerced to
microseconds
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data =
arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
p :=
NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond),
WithTruncatedTimestamps(true))
@@ -383,14 +399,14 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls,
int64(leafArr.Data().Offset()))
}
case *file.Int96ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.TIMESTAMP {
- return xerrors.New("unsupported arrow type to write to
Int96 column")
+ return errors.New("unsupported arrow type to write to
Int96 column")
}
ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
data :=
parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
@@ -401,26 +417,26 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
nulls := leafArr.NullBitmapBytes()
wr.WriteBatchSpaced(data, defLevels, repLevels, nulls,
int64(leafArr.Data().Offset()))
}
case *file.Float32ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.FLOAT32 {
- return xerrors.New("invalid column type to write to
Float")
+ return errors.New("invalid column type to write to
Float")
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(leafArr.(*array.Float32).Float32Values(),
defLevels, repLevels)
+ _, err =
wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
} else {
wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels,
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
case *file.Float64ColumnChunkWriter:
if leafArr.DataType().ID() != arrow.FLOAT64 {
- return xerrors.New("invalid column type to write to
Float")
+ return errors.New("invalid column type to write to
Float")
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(leafArr.(*array.Float64).Float64Values(),
defLevels, repLevels)
+ _, err =
wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
} else {
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels,
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
@@ -449,11 +465,11 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
data[i] =
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
}
default:
- return xerrors.New(fmt.Sprintf("invalid column type to
write to ByteArray: %s", leafArr.DataType().Name()))
+ return fmt.Errorf("%w: invalid column type to write to
ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
wr.WriteBatchSpaced(data, defLevels, repLevels,
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
@@ -466,7 +482,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
data[idx] =
leafArr.(*array.FixedSizeBinary).Value(idx)
}
if !maybeParentNulls && noNulls {
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels,
repLevels)
} else {
wr.WriteBatchSpaced(data, defLevels, repLevels,
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
}
@@ -492,7 +508,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
for idx := range data {
data[idx] =
fixDecimalEndianness(arr.Value(idx))
}
- wr.WriteBatch(data, defLevels, repLevels)
+ _, err = wr.WriteBatch(data, defLevels,
repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
@@ -502,10 +518,10 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
wr.WriteBatchSpaced(data, defLevels, repLevels,
arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
default:
- return xerrors.New("unimplemented")
+ return fmt.Errorf("%w: invalid column type to write to
FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
default:
- return xerrors.New("unknown column writer physical type")
+ return errors.New("unknown column writer physical type")
}
return
}
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index 9eea5b1706..ded398979a 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -30,8 +30,10 @@ import (
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/bitutil"
"github.com/apache/arrow/go/v12/arrow/decimal128"
+ "github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/internal/bitutils"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/apache/arrow/go/v12/internal/utils"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/compress"
@@ -40,6 +42,7 @@ import (
"github.com/apache/arrow/go/v12/parquet/internal/testutils"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/apache/arrow/go/v12/parquet/schema"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@@ -136,7 +139,8 @@ func TestWriteArrowCols(t *testing.T) {
manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)
- sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ sink := encoding.NewBufferWriter(0, mem)
+ defer sink.Release()
writer := file.NewParquetWriter(sink, psc.Root(),
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))
srgw := writer.AppendRowGroup()
@@ -230,15 +234,18 @@ func TestWriteArrowInt96(t *testing.T) {
tbl := makeDateTimeTypesTable(mem, false, false)
defer tbl.Release()
- props :=
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true))
+ props :=
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true),
pqarrow.WithAllocator(mem))
+
psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
require.NoError(t, err)
manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
require.NoError(t, err)
- sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
- writer := file.NewParquetWriter(sink, psc.Root())
+ sink := encoding.NewBufferWriter(0, mem)
+ defer sink.Release()
+
+ writer := file.NewParquetWriter(sink, psc.Root(),
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
srgw := writer.AppendRowGroup()
ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
@@ -283,8 +290,9 @@ func TestWriteArrowInt96(t *testing.T) {
assert.EqualValues(t, data.Value(5), vals[4].ToTime().UnixNano())
}
-func writeTableToBuffer(t *testing.T, tbl arrow.Table, rowGroupSize int64,
props pqarrow.ArrowWriterProperties) *memory.Buffer {
- sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table,
rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
+ sink := encoding.NewBufferWriter(0, mem)
+ defer sink.Release()
wrprops :=
parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
require.NoError(t, err)
@@ -314,13 +322,16 @@ func writeTableToBuffer(t *testing.T, tbl arrow.Table,
rowGroupSize int64, props
}
func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
- buf := writeTableToBuffer(t, tbl, rowGroupSize,
pqarrow.DefaultWriterProps())
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ buf := writeTableToBuffer(t, mem, tbl, rowGroupSize,
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()
rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
- ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{},
memory.DefaultAllocator)
+ ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{},
mem)
require.NoError(t, err)
for i := 0; i < int(tbl.NumCols()); i++ {
@@ -344,6 +355,7 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table,
rowGroupSize int64) {
assert.True(t, array.Equal(chnk, slc.Chunk(0)))
}
}
+ crdr.Release()
}
}
@@ -361,11 +373,16 @@ func TestArrowReadWriteTableChunkedCols(t *testing.T) {
for _, chnksize := range chunkSizes {
chk := array.NewSlice(arr, offset, offset+int64(chnksize))
defer chk.Release()
+ defer chk.Release() // for NewChunked below
chunks = append(chunks, chk)
}
sc := arrow.NewSchema([]arrow.Field{{Name: "field", Type:
arr.DataType(), Nullable: true}}, nil)
- tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0),
arrow.NewChunked(arr.DataType(), chunks))}, -1)
+
+ chk := arrow.NewChunked(arr.DataType(), chunks)
+ defer chk.Release()
+
+ tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0),
chk)}, -1)
defer tbl.Release()
simpleRoundTrip(t, tbl, 2)
@@ -487,6 +504,16 @@ type ParquetIOTestSuite struct {
suite.Suite
}
+func (ps *ParquetIOTestSuite) SetupTest() {
+ ps.NoError(arrow.RegisterExtensionType(types.NewUUIDType()))
+}
+
+func (ps *ParquetIOTestSuite) TearDownTest() {
+ if arrow.GetExtensionType("uuid") != nil {
+ ps.NoError(arrow.UnregisterExtensionType("uuid"))
+ }
+}
+
func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep
parquet.Repetition) *schema.GroupNode {
byteWidth := int32(-1)
@@ -509,80 +536,80 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ
arrow.DataType, rep parquet.R
return schema.MustGroup(schema.NewGroupNode("schema",
parquet.Repetitions.Required, schema.FieldList{pnode}, -1))
}
-func (ps *ParquetIOTestSuite) makePrimitiveTestCol(size int, typ
arrow.DataType) arrow.Array {
+func (ps *ParquetIOTestSuite) makePrimitiveTestCol(mem memory.Allocator, size
int, typ arrow.DataType) arrow.Array {
switch typ.ID() {
case arrow.BOOL:
- bldr := array.NewBooleanBuilder(memory.DefaultAllocator)
+ bldr := array.NewBooleanBuilder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(boolTestValue)
}
return bldr.NewArray()
case arrow.INT8:
- bldr := array.NewInt8Builder(memory.DefaultAllocator)
+ bldr := array.NewInt8Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(int8TestVal)
}
return bldr.NewArray()
case arrow.UINT8:
- bldr := array.NewUint8Builder(memory.DefaultAllocator)
+ bldr := array.NewUint8Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(uint8TestVal)
}
return bldr.NewArray()
case arrow.INT16:
- bldr := array.NewInt16Builder(memory.DefaultAllocator)
+ bldr := array.NewInt16Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(int16TestVal)
}
return bldr.NewArray()
case arrow.UINT16:
- bldr := array.NewUint16Builder(memory.DefaultAllocator)
+ bldr := array.NewUint16Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(uint16TestVal)
}
return bldr.NewArray()
case arrow.INT32:
- bldr := array.NewInt32Builder(memory.DefaultAllocator)
+ bldr := array.NewInt32Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(int32TestVal)
}
return bldr.NewArray()
case arrow.UINT32:
- bldr := array.NewUint32Builder(memory.DefaultAllocator)
+ bldr := array.NewUint32Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(uint32TestVal)
}
return bldr.NewArray()
case arrow.INT64:
- bldr := array.NewInt64Builder(memory.DefaultAllocator)
+ bldr := array.NewInt64Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(int64TestVal)
}
return bldr.NewArray()
case arrow.UINT64:
- bldr := array.NewUint64Builder(memory.DefaultAllocator)
+ bldr := array.NewUint64Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(uint64TestVal)
}
return bldr.NewArray()
case arrow.FLOAT32:
- bldr := array.NewFloat32Builder(memory.DefaultAllocator)
+ bldr := array.NewFloat32Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(floatTestVal)
}
return bldr.NewArray()
case arrow.FLOAT64:
- bldr := array.NewFloat64Builder(memory.DefaultAllocator)
+ bldr := array.NewFloat64Builder(mem)
defer bldr.Release()
for i := 0; i < size; i++ {
bldr.Append(doubleTestVal)
@@ -592,12 +619,14 @@ func (ps *ParquetIOTestSuite) makePrimitiveTestCol(size
int, typ arrow.DataType)
return nil
}
-func (ps *ParquetIOTestSuite) makeTestFile(typ arrow.DataType, arr
arrow.Array, numChunks int) []byte {
+func (ps *ParquetIOTestSuite) makeTestFile(mem memory.Allocator, typ
arrow.DataType, arr arrow.Array, numChunks int) []byte {
sc := ps.makeSimpleSchema(typ, parquet.Repetitions.Required)
- sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
- writer := file.NewParquetWriter(sink, sc)
+ sink := encoding.NewBufferWriter(0, mem)
+ defer sink.Release()
+ writer := file.NewParquetWriter(sink, sc,
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
- ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
+ props := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
+ ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
rowGroupSize := arr.Len() / numChunks
for i := 0; i < numChunks; i++ {
@@ -606,21 +635,23 @@ func (ps *ParquetIOTestSuite) makeTestFile(typ
arrow.DataType, arr arrow.Array,
ps.NoError(err)
start := i * rowGroupSize
- ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw,
array.NewSlice(arr, int64(start), int64(start+rowGroupSize)), nil, nil, false))
- cw.Close()
- rgw.Close()
+ slc := array.NewSlice(arr, int64(start),
int64(start+rowGroupSize))
+ defer slc.Release()
+ ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw, slc, nil, nil,
false))
+ ps.NoError(cw.Close())
+ ps.NoError(rgw.Close())
}
- writer.Close()
+ ps.NoError(writer.Close())
buf := sink.Finish()
defer buf.Release()
return buf.Bytes()
}
-func (ps *ParquetIOTestSuite) createReader(data []byte) *pqarrow.FileReader {
- rdr, err := file.NewParquetReader(bytes.NewReader(data))
+func (ps *ParquetIOTestSuite) createReader(mem memory.Allocator, data []byte)
*pqarrow.FileReader {
+ rdr, err := file.NewParquetReader(bytes.NewReader(data),
file.WithReadProps(parquet.NewReaderProperties(mem)))
ps.NoError(err)
- reader, err := pqarrow.NewFileReader(rdr,
pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
+ reader, err := pqarrow.NewFileReader(rdr,
pqarrow.ArrowReadProperties{}, mem)
ps.NoError(err)
return reader
}
@@ -632,12 +663,12 @@ func (ps *ParquetIOTestSuite) readTable(rdr
*pqarrow.FileReader) arrow.Table {
return tbl
}
-func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(typ
arrow.DataType, numChunks int) {
- values := ps.makePrimitiveTestCol(smallSize, typ)
+func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(mem
memory.Allocator, typ arrow.DataType, numChunks int) {
+ values := ps.makePrimitiveTestCol(mem, smallSize, typ)
defer values.Release()
- data := ps.makeTestFile(typ, values, numChunks)
- reader := ps.createReader(data)
+ data := ps.makeTestFile(mem, typ, values, numChunks)
+ reader := ps.createReader(mem, data)
tbl := ps.readTable(reader)
defer tbl.Release()
@@ -650,19 +681,19 @@ func (ps *ParquetIOTestSuite)
checkSingleColumnRequiredTableRead(typ arrow.DataT
ps.True(array.Equal(values, chunked.Chunk(0)))
}
-func (ps *ParquetIOTestSuite) checkSingleColumnRead(typ arrow.DataType,
numChunks int) {
- values := ps.makePrimitiveTestCol(smallSize, typ)
+func (ps *ParquetIOTestSuite) checkSingleColumnRead(mem memory.Allocator, typ
arrow.DataType, numChunks int) {
+ values := ps.makePrimitiveTestCol(mem, smallSize, typ)
defer values.Release()
- data := ps.makeTestFile(typ, values, numChunks)
- reader := ps.createReader(data)
+ data := ps.makeTestFile(mem, typ, values, numChunks)
+ reader := ps.createReader(mem, data)
cr, err := reader.GetColumn(context.TODO(), 0)
ps.NoError(err)
+ defer cr.Release()
chunked, err := cr.NextBatch(smallSize)
ps.NoError(err)
- defer chunked.Release()
ps.Len(chunked.Chunks(), 1)
ps.True(array.Equal(values, chunked.Chunk(0)))
@@ -674,10 +705,10 @@ func (ps *ParquetIOTestSuite)
TestDateTimeTypesReadWriteTable() {
toWrite := makeDateTimeTypesTable(mem, false, true)
defer toWrite.Release()
- buf := writeTableToBuffer(ps.T(), toWrite, toWrite.NumRows(),
pqarrow.DefaultWriterProps())
+ buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()
- reader := ps.createReader(buf.Bytes())
+ reader := ps.createReader(mem, buf.Bytes())
tbl := ps.readTable(reader)
defer tbl.Release()
@@ -703,10 +734,10 @@ func (ps *ParquetIOTestSuite)
TestDateTimeTypesWithInt96ReadWriteTable() {
expected := makeDateTimeTypesTable(mem, false, true)
defer expected.Release()
- buf := writeTableToBuffer(ps.T(), expected, expected.NumRows(),
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
+ buf := writeTableToBuffer(ps.T(), mem, expected, expected.NumRows(),
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
defer buf.Release()
- reader := ps.createReader(buf.Bytes())
+ reader := ps.createReader(mem, buf.Bytes())
tbl := ps.readTable(reader)
defer tbl.Release()
@@ -729,9 +760,9 @@ func (ps *ParquetIOTestSuite)
TestLargeBinaryReadWriteTable() {
// While we may write using LargeString, when we read, we get an
array.String back out.
// So we're building a normal array.String to use with array.Equal
- lsBldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+ lsBldr := array.NewLargeStringBuilder(mem)
defer lsBldr.Release()
- lbBldr := array.NewBinaryBuilder(memory.DefaultAllocator,
arrow.BinaryTypes.LargeBinary)
+ lbBldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.LargeBinary)
defer lbBldr.Release()
for i := 0; i < smallSize; i++ {
@@ -755,8 +786,10 @@ func (ps *ParquetIOTestSuite)
TestLargeBinaryReadWriteTable() {
},
-1,
)
+ defer lsValues.Release() // NewChunked
+ defer lbValues.Release() // NewChunked
defer expected.Release()
- ps.roundTripTable(expected, true)
+ ps.roundTripTable(mem, expected, true)
}
func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
@@ -779,7 +812,9 @@ func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
for _, n := range nchunks {
for _, dt := range types {
ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n),
func() {
- ps.checkSingleColumnRead(dt, n)
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+ ps.checkSingleColumnRead(mem, dt, n)
})
}
}
@@ -805,13 +840,19 @@ func (ps *ParquetIOTestSuite)
TestSingleColumnRequiredRead() {
for _, n := range nchunks {
for _, dt := range types {
ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n),
func() {
- ps.checkSingleColumnRequiredTableRead(dt, n)
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ ps.checkSingleColumnRequiredTableRead(mem, dt,
n)
})
}
}
}
func (ps *ParquetIOTestSuite) TestReadDecimals() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
bigEndian := []parquet.ByteArray{
// 123456
[]byte{1, 226, 64},
@@ -821,7 +862,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
[]byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 254, 29, 192},
}
- bldr := array.NewDecimal128Builder(memory.DefaultAllocator,
&arrow.Decimal128Type{Precision: 6, Scale: 3})
+ bldr := array.NewDecimal128Builder(mem,
&arrow.Decimal128Type{Precision: 6, Scale: 3})
defer bldr.Release()
bldr.Append(decimal128.FromU64(123456))
@@ -835,7 +876,8 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
schema.Must(schema.NewPrimitiveNodeLogical("decimals",
parquet.Repetitions.Required, schema.NewDecimalLogicalType(6, 3),
parquet.Types.ByteArray, -1, -1)),
}, -1))
- sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+ sink := encoding.NewBufferWriter(0, mem)
+ defer sink.Release()
writer := file.NewParquetWriter(sink, sc)
rgw := writer.AppendRowGroup()
@@ -845,7 +887,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
rgw.Close()
writer.Close()
- rdr := ps.createReader(sink.Bytes())
+ rdr := ps.createReader(mem, sink.Bytes())
cr, err := rdr.GetColumn(context.TODO(), 0)
ps.NoError(err)
@@ -857,31 +899,32 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
ps.True(array.Equal(expected, chunked.Chunk(0)))
}
-func (ps *ParquetIOTestSuite) writeColumn(sc *schema.GroupNode, values
arrow.Array) []byte {
+func (ps *ParquetIOTestSuite) writeColumn(mem memory.Allocator, sc
*schema.GroupNode, values arrow.Array) []byte {
var buf bytes.Buffer
arrsc, err := pqarrow.FromParquet(schema.NewSchema(sc), nil, nil)
ps.NoError(err)
- writer, err := pqarrow.NewFileWriter(arrsc, &buf,
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)),
pqarrow.DefaultWriterProps())
+ writer, err := pqarrow.NewFileWriter(arrsc, &buf,
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
ps.NoError(err)
writer.NewRowGroup()
ps.NoError(writer.WriteColumnData(values))
+ //defer values.Release()
ps.NoError(writer.Close())
ps.NoError(writer.Close())
return buf.Bytes()
}
-func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(data []byte, values
arrow.Array) {
- reader := ps.createReader(data)
+func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(mem
memory.Allocator, data []byte, values arrow.Array) {
+ reader := ps.createReader(mem, data)
cr, err := reader.GetColumn(context.TODO(), 0)
ps.NoError(err)
ps.NotNil(cr)
+ defer cr.Release()
chunked, err := cr.NextBatch(smallSize)
ps.NoError(err)
- defer chunked.Release()
ps.Len(chunked.Chunks(), 1)
ps.NotNil(chunked.Chunk(0))
@@ -917,17 +960,21 @@ var fullTypeList = []arrow.DataType{
func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() {
for _, dt := range fullTypeList {
ps.Run(dt.Name(), func() {
- values := testutils.RandomNonNull(dt, smallSize)
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ values := testutils.RandomNonNull(mem, dt, smallSize)
+ defer values.Release()
sc := ps.makeSimpleSchema(dt,
parquet.Repetitions.Required)
- data := ps.writeColumn(sc, values)
- ps.readAndCheckSingleColumnFile(data, values)
+ data := ps.writeColumn(mem, sc, values)
+ ps.readAndCheckSingleColumnFile(mem, data, values)
})
}
}
-func (ps *ParquetIOTestSuite) roundTripTable(expected arrow.Table, storeSchema
bool) {
- mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
- defer mem.AssertSize(ps.T(), 0)
+func (ps *ParquetIOTestSuite) roundTripTable(_ memory.Allocator, expected
arrow.Table, storeSchema bool) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator) // FIXME:
currently overriding allocator to isolate leaks between roundTripTable and
caller
+ //defer mem.AssertSize(ps.T(), 0) // FIXME:
known leak
var buf bytes.Buffer
var props pqarrow.ArrowWriterProperties
@@ -940,7 +987,7 @@ func (ps *ParquetIOTestSuite) roundTripTable(expected
arrow.Table, storeSchema b
writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
ps.Require().NoError(pqarrow.WriteTable(expected, &buf,
expected.NumRows(), writeProps, props))
- reader := ps.createReader(buf.Bytes())
+ reader := ps.createReader(mem, buf.Bytes())
defer reader.ParquetReader().Close()
tbl := ps.readTable(reader)
@@ -954,7 +1001,9 @@ func (ps *ParquetIOTestSuite) roundTripTable(expected
arrow.Table, storeSchema b
ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
if exChunk.DataType().ID() != arrow.STRUCT {
- ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)),
"expected: %s\ngot: %s", exChunk.Chunk(0), tblChunk.Chunk(0))
+ exc := exChunk.Chunk(0)
+ tbc := tblChunk.Chunk(0)
+ ps.Truef(array.Equal(exc, tbc), "expected: %T %s\ngot: %T %s",
exc, exc, tbc, tbc)
} else {
// current impl of ArrayEquals for structs doesn't correctly
handle nulls in the parent
// with a non-nullable child when comparing. Since after the
round trip, the data in the
@@ -1082,11 +1131,15 @@ func prepareListOfListTable(dt arrow.DataType, size,
nullCount int, nullablePare
}
func (ps *ParquetIOTestSuite) TestSingleEmptyListsColumnReadWrite() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ //defer mem.AssertSize(ps.T(), 0) // FIXME: known leak
+
expected := prepareEmptyListsTable(smallSize)
- buf := writeTableToBuffer(ps.T(), expected, smallSize,
pqarrow.DefaultWriterProps())
+ defer expected.Release()
+ buf := writeTableToBuffer(ps.T(), mem, expected, smallSize,
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
defer buf.Release()
- reader := ps.createReader(buf.Bytes())
+ reader := ps.createReader(mem, buf.Bytes())
tbl := ps.readTable(reader)
defer tbl.Release()
@@ -1103,10 +1156,14 @@ func (ps *ParquetIOTestSuite)
TestSingleEmptyListsColumnReadWrite() {
func (ps *ParquetIOTestSuite) TestSingleColumnOptionalReadWrite() {
for _, dt := range fullTypeList {
ps.Run(dt.Name(), func() {
+ mem :=
memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
values := testutils.RandomNullable(dt, smallSize, 10)
+ defer values.Release()
sc := ps.makeSimpleSchema(dt,
parquet.Repetitions.Optional)
- data := ps.writeColumn(sc, values)
- ps.readAndCheckSingleColumnFile(data, values)
+ data := ps.writeColumn(mem, sc, values)
+ ps.readAndCheckSingleColumnFile(mem, data, values)
})
}
}
@@ -1116,7 +1173,7 @@ func (ps *ParquetIOTestSuite)
TestSingleNullableListNullableColumnReadWrite() {
ps.Run(dt.Name(), func() {
expected := prepareListTable(dt, smallSize, true, true,
10)
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(memory.DefaultAllocator, expected,
false)
})
}
}
@@ -1126,7 +1183,7 @@ func (ps *ParquetIOTestSuite)
TestSingleRequiredListNullableColumnReadWrite() {
ps.Run(dt.Name(), func() {
expected := prepareListTable(dt, smallSize, false,
true, 10)
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(memory.DefaultAllocator, expected,
false)
})
}
}
@@ -1136,7 +1193,7 @@ func (ps *ParquetIOTestSuite)
TestSingleNullableListRequiredColumnReadWrite() {
ps.Run(dt.Name(), func() {
expected := prepareListTable(dt, smallSize, true,
false, 10)
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(memory.DefaultAllocator, expected,
false)
})
}
}
@@ -1146,7 +1203,7 @@ func (ps *ParquetIOTestSuite)
TestSingleRequiredListRequiredColumnReadWrite() {
ps.Run(dt.Name(), func() {
expected := prepareListTable(dt, smallSize, false,
false, 0)
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(memory.DefaultAllocator, expected,
false)
})
}
}
@@ -1156,16 +1213,19 @@ func (ps *ParquetIOTestSuite)
TestSingleNullableListRequiredListRequiredColumnRe
ps.Run(dt.Name(), func() {
expected := prepareListOfListTable(dt, smallSize, 2,
true, false, false)
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(memory.DefaultAllocator, expected,
false)
})
}
}
func (ps *ParquetIOTestSuite) TestSimpleStruct() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
links := arrow.StructOf(arrow.Field{Name: "Backward", Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
arrow.Field{Name: "Forward", Type: arrow.PrimitiveTypes.Int64,
Nullable: true})
- bldr := array.NewStructBuilder(memory.DefaultAllocator, links)
+ bldr := array.NewStructBuilder(mem, links)
defer bldr.Release()
backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
@@ -1184,14 +1244,18 @@ func (ps *ParquetIOTestSuite) TestSimpleStruct() {
tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links",
Type: links}}, nil),
[]arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links",
Type: links}, arrow.NewChunked(links, []arrow.Array{data}))}, -1)
+ defer data.Release() // NewChunked
defer tbl.Release()
- ps.roundTripTable(tbl, false)
+ ps.roundTripTable(mem, tbl, false)
}
func (ps *ParquetIOTestSuite) TestSingleColumnNullableStruct() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
links := arrow.StructOf(arrow.Field{Name: "Backward", Type:
arrow.PrimitiveTypes.Int64, Nullable: true})
- bldr := array.NewStructBuilder(memory.DefaultAllocator, links)
+ bldr := array.NewStructBuilder(mem, links)
defer bldr.Release()
backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
@@ -1205,14 +1269,18 @@ func (ps *ParquetIOTestSuite)
TestSingleColumnNullableStruct() {
tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links",
Type: links, Nullable: true}}, nil),
[]arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links",
Type: links, Nullable: true}, arrow.NewChunked(links, []arrow.Array{data}))},
-1)
+ defer data.Release() // NewChunked
defer tbl.Release()
- ps.roundTripTable(tbl, false)
+ ps.roundTripTable(mem, tbl, false)
}
func (ps *ParquetIOTestSuite) TestNestedRequiredFieldStruct() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
intField := arrow.Field{Name: "int_array", Type:
arrow.PrimitiveTypes.Int32}
- intBldr := array.NewInt32Builder(memory.DefaultAllocator)
+ intBldr := array.NewInt32Builder(mem)
defer intBldr.Release()
intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, nil)
@@ -1231,14 +1299,18 @@ func (ps *ParquetIOTestSuite)
TestNestedRequiredFieldStruct() {
tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
[]arrow.Column{*arrow.NewColumn(structField,
arrow.NewChunked(structField.Type,
[]arrow.Array{stData}))}, -1)
+ defer stData.Release() // NewChunked
defer tbl.Release()
- ps.roundTripTable(tbl, false)
+ ps.roundTripTable(mem, tbl, false)
}
func (ps *ParquetIOTestSuite) TestNestedNullableField() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
intField := arrow.Field{Name: "int_array", Type:
arrow.PrimitiveTypes.Int32, Nullable: true}
- intBldr := array.NewInt32Builder(memory.DefaultAllocator)
+ intBldr := array.NewInt32Builder(mem)
defer intBldr.Release()
intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, []bool{true,
false, true, false, true, true, false, true})
@@ -1257,12 +1329,16 @@ func (ps *ParquetIOTestSuite) TestNestedNullableField()
{
tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
[]arrow.Column{*arrow.NewColumn(structField,
arrow.NewChunked(structField.Type,
[]arrow.Array{stData}))}, -1)
+ defer stData.Release() // NewChunked
defer tbl.Release()
- ps.roundTripTable(tbl, false)
+ ps.roundTripTable(mem, tbl, false)
}
func (ps *ParquetIOTestSuite) TestCanonicalNestedRoundTrip() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
docIdField := arrow.Field{Name: "DocID", Type:
arrow.PrimitiveTypes.Int64}
linksField := arrow.Field{Name: "Links", Type: arrow.StructOf(
arrow.Field{Name: "Backward", Type:
arrow.ListOf(arrow.PrimitiveTypes.Int64)},
@@ -1278,15 +1354,15 @@ func (ps *ParquetIOTestSuite)
TestCanonicalNestedRoundTrip() {
nameField := arrow.Field{Name: "Name", Type: arrow.ListOf(nameStruct)}
sc := arrow.NewSchema([]arrow.Field{docIdField, linksField, nameField},
nil)
- docIDArr, _, err := array.FromJSON(memory.DefaultAllocator,
docIdField.Type, strings.NewReader("[10, 20]"))
+ docIDArr, _, err := array.FromJSON(mem, docIdField.Type,
strings.NewReader("[10, 20]"))
ps.Require().NoError(err)
defer docIDArr.Release()
- linksIDArr, _, err := array.FromJSON(memory.DefaultAllocator,
linksField.Type, strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]},
{"Backward":[10, 30], "Forward": [80]}]`))
+ linksIDArr, _, err := array.FromJSON(mem, linksField.Type,
strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]}, {"Backward":[10,
30], "Forward": [80]}]`))
ps.Require().NoError(err)
defer linksIDArr.Release()
- nameArr, _, err := array.FromJSON(memory.DefaultAllocator,
nameField.Type, strings.NewReader(`
+ nameArr, _, err := array.FromJSON(mem, nameField.Type,
strings.NewReader(`
[[{"Language": [{"Code": "en_us", "Country": "us"},
{"Code": "en_us",
"Country": null}],
"Url": "http://A"},
@@ -1301,12 +1377,19 @@ func (ps *ParquetIOTestSuite)
TestCanonicalNestedRoundTrip() {
*arrow.NewColumn(linksField, arrow.NewChunked(linksField.Type,
[]arrow.Array{linksIDArr})),
*arrow.NewColumn(nameField, arrow.NewChunked(nameField.Type,
[]arrow.Array{nameArr})),
}, 2)
+ defer docIDArr.Release() // NewChunked
+ defer linksIDArr.Release() // NewChunked
+ defer nameArr.Release() // NewChunked
+ defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(mem, expected, false)
}
func (ps *ParquetIOTestSuite) TestFixedSizeList() {
- bldr := array.NewFixedSizeListBuilder(memory.DefaultAllocator, 3,
arrow.PrimitiveTypes.Int16)
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ bldr := array.NewFixedSizeListBuilder(mem, 3,
arrow.PrimitiveTypes.Int16)
defer bldr.Release()
vb := bldr.ValueBuilder().(*array.Int16Builder)
@@ -1315,15 +1398,24 @@ func (ps *ParquetIOTestSuite) TestFixedSizeList() {
vb.AppendValues([]int16{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
data := bldr.NewArray()
+ defer data.Release() // NewArray
+
field := arrow.Field{Name: "root", Type: data.DataType(), Nullable:
true}
- expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
- []arrow.Column{*arrow.NewColumn(field,
arrow.NewChunked(field.Type, []arrow.Array{data}))}, -1)
+ cnk := arrow.NewChunked(field.Type, []arrow.Array{data})
+ defer data.Release() // NewChunked
+
+ tbl := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Column{*arrow.NewColumn(field, cnk)}, -1)
+ defer cnk.Release() // NewColumn
+ defer tbl.Release()
- ps.roundTripTable(expected, true)
+ ps.roundTripTable(mem, tbl, true)
}
func (ps *ParquetIOTestSuite) TestNull() {
- bldr := array.NewNullBuilder(memory.DefaultAllocator)
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ bldr := array.NewNullBuilder(mem)
defer bldr.Release()
bldr.AppendNull()
@@ -1340,12 +1432,15 @@ func (ps *ParquetIOTestSuite) TestNull() {
-1,
)
- ps.roundTripTable(expected, true)
+ ps.roundTripTable(mem, expected, true)
}
// ARROW-17169
func (ps *ParquetIOTestSuite) TestNullableListOfStruct() {
- bldr := array.NewListBuilder(memory.DefaultAllocator, arrow.StructOf(
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ bldr := array.NewListBuilder(mem, arrow.StructOf(
arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "b", Type: arrow.BinaryTypes.String},
))
@@ -1374,13 +1469,17 @@ func (ps *ParquetIOTestSuite)
TestNullableListOfStruct() {
field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Column{*arrow.NewColumn(field,
arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
+ defer arr.Release() // NewChunked
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(mem, expected, false)
}
func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() {
- bldr := array.NewStructBuilder(memory.DefaultAllocator, arrow.StructOf(
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ bldr := array.NewStructBuilder(mem, arrow.StructOf(
arrow.Field{
Nullable: true,
Name: "l",
@@ -1421,9 +1520,10 @@ func (ps *ParquetIOTestSuite)
TestStructWithListOfNestedStructs() {
field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
[]arrow.Column{*arrow.NewColumn(field,
arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
+ defer arr.Release() // NewChunked
defer expected.Release()
- ps.roundTripTable(expected, false)
+ ps.roundTripTable(mem, expected, false)
}
func TestParquetArrowIO(t *testing.T) {
@@ -1431,6 +1531,9 @@ func TestParquetArrowIO(t *testing.T) {
}
func TestBufferedRecWrite(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
sc := arrow.NewSchema([]arrow.Field{
{Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable:
true},
{Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
@@ -1458,7 +1561,7 @@ func TestBufferedRecWrite(t *testing.T) {
wr, err := pqarrow.NewFileWriter(sc, &buf,
parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy),
parquet.WithDictionaryDefault(false), parquet.WithDataPageSize(100*1024)),
- pqarrow.DefaultWriterProps())
+ pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)
p1 := rec.NewSlice(0, SIZELEN/2)
@@ -1486,7 +1589,10 @@ func TestBufferedRecWrite(t *testing.T) {
}
func (ps *ParquetIOTestSuite) TestArrowMapTypeRoundTrip() {
- bldr := array.NewMapBuilder(memory.DefaultAllocator,
arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32, false)
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ bldr := array.NewMapBuilder(mem, arrow.BinaryTypes.String,
arrow.PrimitiveTypes.Int32, false)
defer bldr.Release()
kb := bldr.KeyBuilder().(*array.StringBuilder)
@@ -1510,15 +1616,124 @@ func (ps *ParquetIOTestSuite)
TestArrowMapTypeRoundTrip() {
defer arr.Release()
fld := arrow.Field{Name: "mapped", Type: arr.DataType(), Nullable: true}
- tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil),
- []arrow.Column{*arrow.NewColumn(fld,
arrow.NewChunked(arr.DataType(), []arrow.Array{arr}))}, -1)
+ cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+ defer arr.Release() // NewChunked
+ tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil),
[]arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+ defer cnk.Release() // NewColumn
defer tbl.Release()
- ps.roundTripTable(tbl, true)
+ ps.roundTripTable(mem, tbl, true)
+}
+
+func (ps *ParquetIOTestSuite) TestArrowExtensionTypeRoundTrip() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ extBuilder := array.NewExtensionBuilder(mem, types.NewUUIDType())
+ defer extBuilder.Release()
+ builder := types.NewUUIDBuilder(extBuilder)
+ builder.Append(uuid.New())
+ arr := builder.NewArray()
+ defer arr.Release()
+
+ fld := arrow.Field{Name: "uuid", Type: arr.DataType(), Nullable: true}
+ cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+ defer arr.Release() // NewChunked
+ tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil),
[]arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+ defer cnk.Release() // NewColumn
+ defer tbl.Release()
+
+ ps.roundTripTable(mem, tbl, true)
+}
+
+func (ps *ParquetIOTestSuite) TestArrowUnknownExtensionTypeRoundTrip() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ var written, expected arrow.Table
+
+ {
+ // Prepare `written` table with the extension type registered.
+ extType := types.NewUUIDType()
+ bldr := array.NewExtensionBuilder(mem, extType)
+ defer bldr.Release()
+
+ bldr.Builder.(*array.FixedSizeBinaryBuilder).AppendValues(
+ [][]byte{nil, []byte("abcdefghijklmno0"),
[]byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
+ []bool{false, true, true, true})
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ if arrow.GetExtensionType("uuid") != nil {
+ ps.NoError(arrow.UnregisterExtensionType("uuid"))
+ }
+
+ fld := arrow.Field{Name: "uuid", Type: arr.DataType(),
Nullable: true}
+ cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+ defer arr.Release() // NewChunked
+ written = array.NewTable(arrow.NewSchema([]arrow.Field{fld},
nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+ defer cnk.Release() // NewColumn
+ defer written.Release()
+ }
+
+ {
+ // Prepare `expected` table with the extension type
unregistered in the underlying type.
+ bldr := array.NewFixedSizeBinaryBuilder(mem,
&arrow.FixedSizeBinaryType{ByteWidth: 16})
+ defer bldr.Release()
+ bldr.AppendValues(
+ [][]byte{nil, []byte("abcdefghijklmno0"),
[]byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
+ []bool{false, true, true, true})
+
+ arr := bldr.NewArray()
+ defer arr.Release()
+
+ fld := arrow.Field{Name: "uuid", Type: arr.DataType(),
Nullable: true}
+ cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+ defer arr.Release() // NewChunked
+ expected = array.NewTable(arrow.NewSchema([]arrow.Field{fld},
nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+ defer cnk.Release() // NewColumn
+ defer expected.Release()
+ }
+
+ // sanity check before going deeper
+ ps.Equal(expected.NumCols(), written.NumCols())
+ ps.Equal(expected.NumRows(), written.NumRows())
+
+ // just like roundTripTable() but different written vs. expected tables
+ var buf bytes.Buffer
+ props := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema(),
pqarrow.WithAllocator(mem))
+
+ writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
+ ps.Require().NoError(pqarrow.WriteTable(written, &buf,
written.NumRows(), writeProps, props))
+
+ reader := ps.createReader(mem, buf.Bytes())
+ defer reader.ParquetReader().Close()
+
+ tbl := ps.readTable(reader)
+ defer tbl.Release()
+
+ ps.Equal(expected.NumCols(), tbl.NumCols())
+ ps.Equal(expected.NumRows(), tbl.NumRows())
+
+ exChunk := expected.Column(0).Data()
+ tblChunk := tbl.Column(0).Data()
+
+ ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
+ exc := exChunk.Chunk(0)
+ tbc := tblChunk.Chunk(0)
+ ps.Truef(array.Equal(exc, tbc), "expected: %T %s\ngot: %T %s", exc,
exc, tbc, tbc)
+
+ expectedMd := arrow.MetadataFrom(map[string]string{
+ ipc.ExtensionTypeKeyName: "uuid",
+ ipc.ExtensionMetadataKeyName: "uuid-serialized",
+ "PARQUET:field_id": "-1",
+ })
+ ps.Truef(expectedMd.Equal(tbl.Column(0).Field().Metadata), "expected:
%v\ngot: %v", expectedMd, tbl.Column(0).Field().Metadata)
}
func TestWriteTableMemoryAllocation(t *testing.T) {
- allocator := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
sc := arrow.NewSchema([]arrow.Field{
{Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable:
true},
{Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
@@ -1526,9 +1741,10 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
arrow.Field{Name: "i64", Type:
arrow.PrimitiveTypes.Int64, Nullable: true},
arrow.Field{Name: "f64", Type:
arrow.PrimitiveTypes.Float64, Nullable: true})},
{Name: "arr_i64", Type:
arrow.ListOf(arrow.PrimitiveTypes.Int64)},
+ {Name: "uuid", Type: types.NewUUIDType(), Nullable: true},
}, nil)
- bld := array.NewRecordBuilder(allocator, sc)
+ bld := array.NewRecordBuilder(mem, sc)
bld.Field(0).(*array.Float32Builder).Append(1.0)
bld.Field(1).(*array.Int32Builder).Append(1)
sbld := bld.Field(2).(*array.StructBuilder)
@@ -1538,6 +1754,7 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
abld := bld.Field(3).(*array.ListBuilder)
abld.Append(true)
abld.ValueBuilder().(*array.Int64Builder).Append(2)
+
bld.Field(4).(*types.UUIDBuilder).Append(uuid.MustParse("00000000-0000-0000-0000-000000000001"))
rec := bld.NewRecord()
bld.Release()
@@ -1545,12 +1762,12 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
var buf bytes.Buffer
wr, err := pqarrow.NewFileWriter(sc, &buf,
parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy)),
-
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(allocator)))
+ pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
require.NoError(t, err)
require.NoError(t, wr.Write(rec))
rec.Release()
wr.Close()
- require.Zero(t, allocator.CurrentAlloc())
+ require.Zero(t, mem.CurrentAlloc())
}
diff --git a/go/parquet/pqarrow/encode_dictionary_test.go
b/go/parquet/pqarrow/encode_dictionary_test.go
index d56f1bdde9..10ad1cd36b 100644
--- a/go/parquet/pqarrow/encode_dictionary_test.go
+++ b/go/parquet/pqarrow/encode_dictionary_test.go
@@ -61,8 +61,8 @@ func (ps *ParquetIOTestSuite)
TestSingleColumnOptionalDictionaryWrite() {
defer arr.Release()
sc := ps.makeSimpleSchema(arr.DataType(),
parquet.Repetitions.Optional)
- data := ps.writeColumn(sc, arr)
- ps.readAndCheckSingleColumnFile(data, values)
+ data := ps.writeColumn(mem, sc, arr)
+ ps.readAndCheckSingleColumnFile(mem, data, values)
})
}
}
diff --git a/go/parquet/pqarrow/file_reader.go
b/go/parquet/pqarrow/file_reader.go
index f92ac5a3e7..d73f9301e5 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -356,11 +356,9 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context,
indices, rowGroups []in
// pass pairs of reader and column index to the channel for the
// goroutines to read the data
- for idx, r := range readers {
- defer func(r *ColumnReader) {
- r.Release()
- }(r)
- ch <- readerInfo{r, idx}
+ for idx := range readers {
+ defer readers[idx].Release()
+ ch <- readerInfo{readers[idx], idx}
}
close(ch)
diff --git a/go/parquet/pqarrow/file_writer.go
b/go/parquet/pqarrow/file_writer.go
index e4e212eca8..413dcf9ec3 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -32,7 +32,7 @@ import (
)
// WriteTable is a convenience function to create and write a full array.Table
to a parquet file. The schema
-// and columns will be determined by the schema of the table, writing the file
out to the the provided writer.
+// and columns will be determined by the schema of the table, writing the file
out to the provided writer.
// The chunksize will be utilized in order to determine the size of the row
groups.
func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props
*parquet.WriterProperties, arrprops ArrowWriterProperties) error {
writer, err := NewFileWriter(tbl.Schema(), w, props, arrprops)
diff --git a/go/parquet/pqarrow/path_builder.go
b/go/parquet/pqarrow/path_builder.go
index c3f9dc5698..7c55dd8417 100644
--- a/go/parquet/pqarrow/path_builder.go
+++ b/go/parquet/pqarrow/path_builder.go
@@ -400,7 +400,7 @@ func (p *pathBuilder) Visit(arr arrow.Array) error {
p.maybeAddNullable(arr)
larr := arr.(*array.FixedSizeList)
listSize := larr.DataType().(*arrow.FixedSizeListType).Len()
- // technically we could encoded fixed sized lists with two
level encodings
+ // technically we could encode fixed sized lists with two level
encodings
// but we always use 3 level encoding, so we increment def
levels as well
p.info.maxDefLevel++
p.info.maxRepLevel++
@@ -440,7 +440,7 @@ func (p *pathBuilder) Visit(arr arrow.Array) error {
}
return nil
case arrow.EXTENSION:
- return xerrors.New("extension types not implemented yet")
+ return p.Visit(arr.(array.ExtensionArray).Storage())
case arrow.SPARSE_UNION, arrow.DENSE_UNION:
return xerrors.New("union types aren't supported in parquet")
default:
diff --git a/go/parquet/pqarrow/path_builder_test.go
b/go/parquet/pqarrow/path_builder_test.go
index 3dad25dd70..c3ea34ee38 100644
--- a/go/parquet/pqarrow/path_builder_test.go
+++ b/go/parquet/pqarrow/path_builder_test.go
@@ -23,6 +23,8 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -358,6 +360,52 @@ func TestNestedListsSomeNullsSomeEmpty(t *testing.T) {
assert.Equal(t, []int16{0, 0, 2, 2, 1, 1, 0, 2}, result.repLevels)
}
+func TestNestedExtensionListsWithSomeNulls(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(t, 0)
+
+ listType := arrow.ListOf(types.NewUUIDType())
+ bldr := array.NewListBuilder(mem, listType)
+ defer bldr.Release()
+
+ nestedBldr := bldr.ValueBuilder().(*array.ListBuilder)
+ vb := nestedBldr.ValueBuilder().(*types.UUIDBuilder)
+
+ uuid1 := uuid.New()
+ uuid3 := uuid.New()
+ uuid4 := uuid.New()
+ uuid5 := uuid.New()
+
+ // produce: [null, [[uuid1, null, uuid3], null, null], [[uuid4, uuid5]]]
+
+ bldr.AppendNull()
+ bldr.Append(true)
+ nestedBldr.Append(true)
+ vb.Append(uuid1)
+ vb.AppendNull()
+ vb.Append(uuid3)
+ nestedBldr.AppendNull()
+ nestedBldr.AppendNull()
+ bldr.Append(true)
+ nestedBldr.Append(true)
+ vb.AppendValues([]uuid.UUID{uuid4, uuid5}, nil)
+
+ arr := bldr.NewListArray()
+ defer arr.Release()
+
+ mp, err := newMultipathLevelBuilder(arr, true)
+ require.NoError(t, err)
+ defer mp.Release()
+
+ ctx := arrowCtxFromContext(NewArrowWriteContext(context.Background(),
nil))
+ result, err := mp.write(0, ctx)
+ require.NoError(t, err)
+
+ assert.Equal(t, []int16{0, 5, 4, 5, 2, 2, 5, 5}, result.defLevels)
+ assert.Equal(t, []int16{0, 0, 2, 2, 1, 1, 0, 2}, result.repLevels)
+ assert.Equal(t, result.leafArr.NullN(), 1)
+}
+
// triplenested translates to parquet:
//
// optional group bag {
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index b7813d92e6..3b926242d6 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/flight"
+ "github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
@@ -354,7 +355,15 @@ func fieldToNode(name string, field arrow.Field, props
*parquet.WriterProperties
return fieldToNode(name, arrow.Field{Name: name, Type:
dictType.ValueType, Nullable: field.Nullable, Metadata: field.Metadata},
props, arrprops)
case arrow.EXTENSION:
- return nil, xerrors.New("not implemented yet")
+ return fieldToNode(name, arrow.Field{
+ Name: name,
+ Type:
field.Type.(arrow.ExtensionType).StorageType(),
+ Nullable: field.Nullable,
+ Metadata: arrow.MetadataFrom(map[string]string{
+ ipc.ExtensionTypeKeyName:
field.Type.(arrow.ExtensionType).ExtensionName(),
+ ipc.ExtensionMetadataKeyName:
field.Type.(arrow.ExtensionType).Serialize(),
+ }),
+ }, props, arrprops)
case arrow.MAP:
mapType := field.Type.(*arrow.MapType)
keyNode, err := fieldToNode("key", mapType.KeyField(), props,
arrprops)
@@ -948,7 +957,24 @@ func getNestedFactory(origin, inferred arrow.DataType)
func(fieldList []arrow.Fi
func applyOriginalStorageMetadata(origin arrow.Field, inferred *SchemaField)
(modified bool, err error) {
nchildren := len(inferred.Children)
switch origin.Type.ID() {
- case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION:
+ case arrow.EXTENSION:
+ extType := origin.Type.(arrow.ExtensionType)
+ modified, err = applyOriginalStorageMetadata(arrow.Field{
+ Type: extType.StorageType(),
+ Metadata: origin.Metadata,
+ }, inferred)
+ if err != nil {
+ return
+ }
+
+ if !arrow.TypeEqual(extType.StorageType(), inferred.Field.Type)
{
+ return modified, fmt.Errorf("%w: mismatch storage type
'%s' for extension type '%s'",
+ arrow.ErrInvalid, inferred.Field.Type, extType)
+ }
+
+ inferred.Field.Type = extType
+ modified = true
+ case arrow.SPARSE_UNION, arrow.DENSE_UNION:
err = xerrors.New("unimplemented type")
case arrow.STRUCT:
typ := origin.Type.(*arrow.StructType)
@@ -1053,10 +1079,6 @@ func applyOriginalStorageMetadata(origin arrow.Field,
inferred *SchemaField) (mo
}
func applyOriginalMetadata(origin arrow.Field, inferred *SchemaField) (bool,
error) {
- if origin.Type.ID() == arrow.EXTENSION {
- return false, xerrors.New("extension types not implemented yet")
- }
-
return applyOriginalStorageMetadata(origin, inferred)
}
diff --git a/go/parquet/pqarrow/schema_test.go
b/go/parquet/pqarrow/schema_test.go
index 07133fb400..219528a2e3 100644
--- a/go/parquet/pqarrow/schema_test.go
+++ b/go/parquet/pqarrow/schema_test.go
@@ -22,7 +22,9 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/flight"
+ "github.com/apache/arrow/go/v12/arrow/ipc"
"github.com/apache/arrow/go/v12/arrow/memory"
+ "github.com/apache/arrow/go/v12/internal/types"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/metadata"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
@@ -32,13 +34,20 @@ import (
)
func TestGetOriginSchemaBase64(t *testing.T) {
+ uuidType := types.NewUUIDType()
md := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})
+ extMd := arrow.NewMetadata([]string{ipc.ExtensionMetadataKeyName,
ipc.ExtensionTypeKeyName, "PARQUET:field_id"}, []string{uuidType.Serialize(),
uuidType.ExtensionName(), "-1"})
origArrSc := arrow.NewSchema([]arrow.Field{
{Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
{Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+ {Name: "uuid", Type: uuidType, Metadata: extMd},
}, nil)
arrSerializedSc := flight.SerializeSchema(origArrSc,
memory.DefaultAllocator)
+ if err := arrow.RegisterExtensionType(uuidType); err != nil {
+ t.Fatal(err)
+ }
+ defer arrow.UnregisterExtensionType(uuidType.ExtensionName())
pqschema, err := pqarrow.ToParquet(origArrSc, nil,
pqarrow.DefaultWriterProps())
require.NoError(t, err)
@@ -61,6 +70,40 @@ func TestGetOriginSchemaBase64(t *testing.T) {
}
}
+func TestGetOriginSchemaUnregisteredExtension(t *testing.T) {
+ uuidType := types.NewUUIDType()
+ if err := arrow.RegisterExtensionType(uuidType); err != nil {
+ t.Fatal(err)
+ }
+
+ md := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})
+ origArrSc := arrow.NewSchema([]arrow.Field{
+ {Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
+ {Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+ {Name: "uuid", Type: uuidType, Metadata: md},
+ }, nil)
+ pqschema, err := pqarrow.ToParquet(origArrSc, nil,
pqarrow.DefaultWriterProps())
+ require.NoError(t, err)
+
+ arrSerializedSc := flight.SerializeSchema(origArrSc,
memory.DefaultAllocator)
+ kv := metadata.NewKeyValueMetadata()
+ kv.Append("ARROW:schema",
base64.StdEncoding.EncodeToString(arrSerializedSc))
+
+ arrow.UnregisterExtensionType(uuidType.ExtensionName())
+ arrsc, err := pqarrow.FromParquet(pqschema, nil, kv)
+ require.NoError(t, err)
+
+ extMd := arrow.NewMetadata([]string{ipc.ExtensionMetadataKeyName,
ipc.ExtensionTypeKeyName, "PARQUET:field_id"},
+ []string{uuidType.Serialize(), uuidType.ExtensionName(), "-1"})
+ expArrSc := arrow.NewSchema([]arrow.Field{
+ {Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
+ {Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+ {Name: "uuid", Type: uuidType.StorageType(), Metadata: extMd},
+ }, nil)
+
+ assert.Truef(t, expArrSc.Equal(arrsc), "expected: %s\ngot: %s",
expArrSc, arrsc)
+}
+
func TestToParquetWriterConfig(t *testing.T) {
origSc := arrow.NewSchema([]arrow.Field{
{Name: "f1", Type: arrow.BinaryTypes.String},
diff --git a/testing b/testing
index 47f7b56b25..0c73b9add4 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12
+Subproject commit 0c73b9add4e8e30eb9c2690d077fabb269750b27