This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2904b0bf8 GH-34330: [Go][Parquet]: Add Extension type support (#34631)
c2904b0bf8 is described below

commit c2904b0bf8ca142dacd44bced8510b5bea540538
Author: Yevgeny Pats <[email protected]>
AuthorDate: Wed Apr 12 15:59:36 2023 -0400

    GH-34330: [Go][Parquet]: Add Extension type support (#34631)
    
    Follow-up instead of https://github.com/apache/arrow/pull/34356
    * Closes: #34330
    
    This is not yet complete but I would love some direction on where should I 
add tests and in what other places I should handle the new extension type.
    
    Lead-authored-by: Kemal Hadimli <[email protected]>
    Co-authored-by: Herman Schaaf <[email protected]>
    Co-authored-by: Yevgeny Pats <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/arrow/array/array_test.go                       |   2 +-
 go/arrow/array/dictionary_test.go                  |   2 +-
 go/arrow/array/diff_test.go                        |   2 +-
 go/arrow/array/extension_test.go                   |   2 +-
 go/arrow/array/table_test.go                       |  15 +-
 go/arrow/compute/cast_test.go                      |   2 +-
 go/arrow/compute/internal/exec/span_test.go        |   2 +-
 go/arrow/compute/vector_selection_test.go          |   2 +-
 go/arrow/csv/reader_test.go                        |  15 +-
 go/arrow/csv/writer_test.go                        |   3 +-
 go/arrow/datatype_extension_test.go                |   2 +-
 go/arrow/internal/arrdata/arrdata.go               |   2 +-
 go/arrow/internal/flight_integration/scenario.go   |   2 +-
 .../ipc/cmd/arrow-json-integration-test/main.go    |   2 +-
 go/arrow/ipc/endian_swap_test.go                   |   2 +-
 go/arrow/ipc/metadata_test.go                      |   2 +-
 go/arrow/table.go                                  |  34 +-
 .../testing => internal}/types/extension_types.go  |   8 +-
 .../types/extension_types_test.go}                 |   2 +-
 go/parquet/internal/encoding/types.go              |   6 +
 go/parquet/internal/testutils/random_arrow.go      |  38 +-
 go/parquet/pqarrow/column_readers.go               |  42 +-
 go/parquet/pqarrow/encode_arrow.go                 |  60 +--
 go/parquet/pqarrow/encode_arrow_test.go            | 429 ++++++++++++++++-----
 go/parquet/pqarrow/encode_dictionary_test.go       |   4 +-
 go/parquet/pqarrow/file_reader.go                  |   8 +-
 go/parquet/pqarrow/file_writer.go                  |   2 +-
 go/parquet/pqarrow/path_builder.go                 |   4 +-
 go/parquet/pqarrow/path_builder_test.go            |  48 +++
 go/parquet/pqarrow/schema.go                       |  34 +-
 go/parquet/pqarrow/schema_test.go                  |  43 +++
 testing                                            |   2 +-
 32 files changed, 599 insertions(+), 224 deletions(-)

diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go
index 74eb0c4839..ab1b8831c8 100644
--- a/go/arrow/array/array_test.go
+++ b/go/arrow/array/array_test.go
@@ -22,8 +22,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/internal/testing/tools"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
 )
 
diff --git a/go/arrow/array/dictionary_test.go 
b/go/arrow/array/dictionary_test.go
index 5c999df315..667bf5b24a 100644
--- a/go/arrow/array/dictionary_test.go
+++ b/go/arrow/array/dictionary_test.go
@@ -27,8 +27,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/bitutil"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
diff --git a/go/arrow/array/diff_test.go b/go/arrow/array/diff_test.go
index e2c7820d5f..55f496b4db 100644
--- a/go/arrow/array/diff_test.go
+++ b/go/arrow/array/diff_test.go
@@ -26,8 +26,8 @@ import (
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
 )
 
 type diffTestCase struct {
diff --git a/go/arrow/array/extension_test.go b/go/arrow/array/extension_test.go
index 121bf22358..05f3fc26b6 100644
--- a/go/arrow/array/extension_test.go
+++ b/go/arrow/array/extension_test.go
@@ -21,8 +21,8 @@ import (
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/suite"
 )
 
diff --git a/go/arrow/array/table_test.go b/go/arrow/array/table_test.go
index 62437104d6..67cab2066e 100644
--- a/go/arrow/array/table_test.go
+++ b/go/arrow/array/table_test.go
@@ -17,6 +17,7 @@
 package array_test
 
 import (
+       "errors"
        "fmt"
        "reflect"
        "testing"
@@ -157,7 +158,17 @@ func TestChunkedInvalid(t *testing.T) {
                if e == nil {
                        t.Fatalf("expected a panic")
                }
-               if got, want := e.(string), "arrow/array: mismatch data type"; 
got != want {
+
+               err, ok := e.(error)
+               if !ok {
+                       t.Fatalf("expected an error")
+               }
+
+               if !errors.Is(err, arrow.ErrInvalid) {
+                       t.Fatalf("should be an ErrInvalid")
+               }
+
+               if got, want := err.Error(), fmt.Sprintf("%s: arrow/array: 
mismatch data type float64 vs int32", arrow.ErrInvalid); got != want {
                        t.Fatalf("invalid error. got=%q, want=%q", got, want)
                }
        }()
@@ -313,7 +324,7 @@ func TestColumn(t *testing.T) {
                                return c
                        }(),
                        field: arrow.Field{Name: "f32", Type: 
arrow.PrimitiveTypes.Float32},
-                       err:   fmt.Errorf("arrow/array: inconsistent data 
type"),
+                       err:   fmt.Errorf("%w: arrow/array: inconsistent data 
type float64 vs float32", arrow.ErrInvalid),
                },
        } {
                t.Run("", func(t *testing.T) {
diff --git a/go/arrow/compute/cast_test.go b/go/arrow/compute/cast_test.go
index 5c9c7f9cd9..921143bee4 100644
--- a/go/arrow/compute/cast_test.go
+++ b/go/arrow/compute/cast_test.go
@@ -33,9 +33,9 @@ import (
        "github.com/apache/arrow/go/v12/arrow/decimal128"
        "github.com/apache/arrow/go/v12/arrow/decimal256"
        "github.com/apache/arrow/go/v12/arrow/internal/testing/gen"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/apache/arrow/go/v12/arrow/scalar"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
diff --git a/go/arrow/compute/internal/exec/span_test.go 
b/go/arrow/compute/internal/exec/span_test.go
index 52409daadb..c62113e48c 100644
--- a/go/arrow/compute/internal/exec/span_test.go
+++ b/go/arrow/compute/internal/exec/span_test.go
@@ -29,9 +29,9 @@ import (
        "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
        "github.com/apache/arrow/go/v12/arrow/endian"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/apache/arrow/go/v12/arrow/scalar"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
 )
 
diff --git a/go/arrow/compute/vector_selection_test.go 
b/go/arrow/compute/vector_selection_test.go
index b328f5fdf4..209e9b8d59 100644
--- a/go/arrow/compute/vector_selection_test.go
+++ b/go/arrow/compute/vector_selection_test.go
@@ -30,9 +30,9 @@ import (
        "github.com/apache/arrow/go/v12/arrow/compute/internal/exec"
        "github.com/apache/arrow/go/v12/arrow/compute/internal/kernels"
        "github.com/apache/arrow/go/v12/arrow/internal/testing/gen"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/apache/arrow/go/v12/arrow/scalar"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
diff --git a/go/arrow/csv/reader_test.go b/go/arrow/csv/reader_test.go
index c5bd90d76e..560354fb64 100644
--- a/go/arrow/csv/reader_test.go
+++ b/go/arrow/csv/reader_test.go
@@ -31,8 +31,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow/csv"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
        "github.com/apache/arrow/go/v12/arrow/decimal256"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -165,10 +165,10 @@ func Example_withChunk() {
 }
 
 func TestCSVReadInvalidFields(t *testing.T) {
-  tests := []struct {
-               Name string
-               Data string
-               Fields []arrow.Field
+       tests := []struct {
+               Name          string
+               Data          string
+               Fields        []arrow.Field
                ExpectedError bool
        }{
                {
@@ -201,13 +201,14 @@ func TestCSVReadInvalidFields(t *testing.T) {
                t.Run(tc.Name, func(t *testing.T) {
                        f := bytes.NewBufferString(tc.Data)
                        schema := arrow.NewSchema(tc.Fields, nil)
-               
+
                        r := csv.NewReader(
                                f, schema,
                                csv.WithComma(','),
                        )
                        defer r.Release()
-                       for r.Next() {}
+                       for r.Next() {
+                       }
                        parseErr := r.Err()
                        if tc.ExpectedError && parseErr == nil {
                                t.Fatal("Expected error, but none found")
diff --git a/go/arrow/csv/writer_test.go b/go/arrow/csv/writer_test.go
index bfe0bcddf9..9f1b524ce7 100644
--- a/go/arrow/csv/writer_test.go
+++ b/go/arrow/csv/writer_test.go
@@ -30,8 +30,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow/csv"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
        "github.com/apache/arrow/go/v12/arrow/decimal256"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/google/uuid"
 )
 
@@ -257,7 +257,6 @@ func testCSVWriter(t *testing.T, data [][]string, 
writeHeader bool, fmtr func(bo
        listBuilderInt64.AppendValues([]int64{7, 8, 9}, nil)
        b.Field(18).(*array.BinaryBuilder).AppendValues([][]byte{{0, 1, 2}, {3, 
4, 5}, {}}, nil)
        
b.Field(19).(*types.UUIDBuilder).AppendValues([]uuid.UUID{uuid.MustParse("00000000-0000-0000-0000-000000000001"),
 uuid.MustParse("00000000-0000-0000-0000-000000000002"), 
uuid.MustParse("00000000-0000-0000-0000-000000000003")}, nil)
-       
 
        for _, field := range b.Fields() {
                field.AppendNull()
diff --git a/go/arrow/datatype_extension_test.go 
b/go/arrow/datatype_extension_test.go
index c9a62aa5d8..0458437b09 100644
--- a/go/arrow/datatype_extension_test.go
+++ b/go/arrow/datatype_extension_test.go
@@ -21,7 +21,7 @@ import (
        "testing"
 
        "github.com/apache/arrow/go/v12/arrow"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/suite"
 )
diff --git a/go/arrow/internal/arrdata/arrdata.go 
b/go/arrow/internal/arrdata/arrdata.go
index aa05939356..ae4f917ca1 100644
--- a/go/arrow/internal/arrdata/arrdata.go
+++ b/go/arrow/internal/arrdata/arrdata.go
@@ -25,9 +25,9 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
        "github.com/apache/arrow/go/v12/arrow/float16"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
 )
 
 var (
diff --git a/go/arrow/internal/flight_integration/scenario.go 
b/go/arrow/internal/flight_integration/scenario.go
index f2d2d693d9..b8b214ac7d 100644
--- a/go/arrow/internal/flight_integration/scenario.go
+++ b/go/arrow/internal/flight_integration/scenario.go
@@ -34,9 +34,9 @@ import (
        "github.com/apache/arrow/go/v12/arrow/flight/flightsql"
        "github.com/apache/arrow/go/v12/arrow/flight/flightsql/schema_ref"
        "github.com/apache/arrow/go/v12/arrow/internal/arrjson"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "golang.org/x/xerrors"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
diff --git a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go 
b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
index ad78b61f2e..c7e5e3c14b 100644
--- a/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
+++ b/go/arrow/ipc/cmd/arrow-json-integration-test/main.go
@@ -26,8 +26,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/arrio"
        "github.com/apache/arrow/go/v12/arrow/internal/arrjson"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/ipc"
+       "github.com/apache/arrow/go/v12/internal/types"
 )
 
 func main() {
diff --git a/go/arrow/ipc/endian_swap_test.go b/go/arrow/ipc/endian_swap_test.go
index 146f7b08e1..73561e42e8 100644
--- a/go/arrow/ipc/endian_swap_test.go
+++ b/go/arrow/ipc/endian_swap_test.go
@@ -23,8 +23,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/endian"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
diff --git a/go/arrow/ipc/metadata_test.go b/go/arrow/ipc/metadata_test.go
index 805b1c5e1e..dc24164c28 100644
--- a/go/arrow/ipc/metadata_test.go
+++ b/go/arrow/ipc/metadata_test.go
@@ -25,8 +25,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/internal/dictutils"
        "github.com/apache/arrow/go/v12/arrow/internal/flatbuf"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        flatbuffers "github.com/google/flatbuffers/go"
        "github.com/stretchr/testify/assert"
 )
diff --git a/go/arrow/table.go b/go/arrow/table.go
index 467102c00c..0d20d955ab 100644
--- a/go/arrow/table.go
+++ b/go/arrow/table.go
@@ -17,6 +17,7 @@
 package arrow
 
 import (
+       "fmt"
        "sync/atomic"
 
        "github.com/apache/arrow/go/v12/arrow/internal/debug"
@@ -42,20 +43,19 @@ type Table interface {
 // To get strongly typed data from a Column, you need to iterate the
 // chunks and type assert each individual Array. For example:
 //
-//             switch column.DataType().ID {
-//             case arrow.INT32:
-//                     for _, c := range column.Data().Chunks() {
-//                             arr := c.(*array.Int32)
-//                             // do something with arr
-//                     }
-//             case arrow.INT64:
-//                     for _, c := range column.Data().Chunks() {
-//                             arr := c.(*array.Int64)
-//                             // do something with arr
-//                     }
-//             case ...
+//     switch column.DataType().ID {
+//     case arrow.INT32:
+//             for _, c := range column.Data().Chunks() {
+//                     arr := c.(*array.Int32)
+//                     // do something with arr
 //             }
-//
+//     case arrow.INT64:
+//             for _, c := range column.Data().Chunks() {
+//                     arr := c.(*array.Int64)
+//                     // do something with arr
+//             }
+//     case ...
+//     }
 type Column struct {
        field Field
        data  *Chunked
@@ -69,7 +69,7 @@ type Column struct {
 // of the ref counting.
 func NewColumnFromArr(field Field, arr Array) Column {
        if !TypeEqual(field.Type, arr.DataType()) {
-               panic("arrow/array: inconsistent data type")
+               panic(fmt.Errorf("%w: arrow/array: inconsistent data type %s vs 
%s", ErrInvalid, field.Type, arr.DataType()))
        }
 
        arr.Retain()
@@ -98,7 +98,7 @@ func NewColumn(field Field, chunks *Chunked) *Column {
 
        if !TypeEqual(col.data.DataType(), col.field.Type) {
                col.data.Release()
-               panic("arrow/array: inconsistent data type")
+               panic(fmt.Errorf("%w: arrow/array: inconsistent data type %s vs 
%s", ErrInvalid, col.data.DataType(), col.field.Type))
        }
 
        return &col
@@ -148,9 +148,9 @@ func NewChunked(dtype DataType, chunks []Array) *Chunked {
                if chunk == nil {
                        continue
                }
-               
+
                if !TypeEqual(chunk.DataType(), dtype) {
-                       panic("arrow/array: mismatch data type")
+                       panic(fmt.Errorf("%w: arrow/array: mismatch data type 
%s vs %s", ErrInvalid, chunk.DataType().String(), dtype.String()))
                }
                chunk.Retain()
                arr.chunks = append(arr.chunks, chunk)
diff --git a/go/arrow/internal/testing/types/extension_types.go 
b/go/internal/types/extension_types.go
similarity index 98%
rename from go/arrow/internal/testing/types/extension_types.go
rename to go/internal/types/extension_types.go
index 80d8111b4e..bb087a7d34 100644
--- a/go/arrow/internal/testing/types/extension_types.go
+++ b/go/internal/types/extension_types.go
@@ -209,13 +209,13 @@ func (UUIDType) ExtensionName() string { return "uuid" }
 func (UUIDType) Serialize() string { return "uuid-serialized" }
 
 // Deserialize expects storageType to be FixedSizeBinaryType{ByteWidth: 16} 
and the data to be
-// "uuid-serialized" in order to correctly create a UuidType for testing 
deserialize.
+// "uuid-serialized" in order to correctly create a UUIDType for testing 
deserialize.
 func (UUIDType) Deserialize(storageType arrow.DataType, data string) 
(arrow.ExtensionType, error) {
        if string(data) != "uuid-serialized" {
                return nil, fmt.Errorf("type identifier did not match: '%s'", 
string(data))
        }
        if !arrow.TypeEqual(storageType, &arrow.FixedSizeBinaryType{ByteWidth: 
16}) {
-               return nil, fmt.Errorf("invalid storage type for UuidType: %s", 
storageType.Name())
+               return nil, fmt.Errorf("invalid storage type for UUIDType: %s", 
storageType.Name())
        }
        return NewUUIDType(), nil
 }
@@ -258,7 +258,6 @@ func (a Parametric2Array) ValueString(i int) string {
        return fmt.Sprintf("%d", arr.Value(i))
 }
 
-
 // A type where ExtensionName is always the same
 type Parametric1Type struct {
        arrow.ExtensionBase
@@ -266,7 +265,6 @@ type Parametric1Type struct {
        param int32
 }
 
-
 func NewParametric1Type(p int32) *Parametric1Type {
        ret := &Parametric1Type{param: p}
        ret.ExtensionBase.Storage = arrow.PrimitiveTypes.Int32
@@ -516,13 +514,11 @@ func (SmallintType) Deserialize(storageType 
arrow.DataType, data string) (arrow.
 }
 
 var (
-       _ arrow.ExtensionType  = (*UUIDType)(nil)
        _ arrow.ExtensionType  = (*Parametric1Type)(nil)
        _ arrow.ExtensionType  = (*Parametric2Type)(nil)
        _ arrow.ExtensionType  = (*ExtStructType)(nil)
        _ arrow.ExtensionType  = (*DictExtensionType)(nil)
        _ arrow.ExtensionType  = (*SmallintType)(nil)
-       _ array.ExtensionArray = (*UUIDArray)(nil)
        _ array.ExtensionArray = (*Parametric1Array)(nil)
        _ array.ExtensionArray = (*Parametric2Array)(nil)
        _ array.ExtensionArray = (*ExtStructArray)(nil)
diff --git a/go/arrow/internal/testing/types/extension_test.go 
b/go/internal/types/extension_types_test.go
similarity index 97%
rename from go/arrow/internal/testing/types/extension_test.go
rename to go/internal/types/extension_types_test.go
index a0c07cdda3..39372bdf7d 100644
--- a/go/arrow/internal/testing/types/extension_test.go
+++ b/go/internal/types/extension_types_test.go
@@ -23,8 +23,8 @@ import (
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
-       "github.com/apache/arrow/go/v12/arrow/internal/testing/types"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
diff --git a/go/parquet/internal/encoding/types.go 
b/go/parquet/internal/encoding/types.go
index 83da564bdf..a53c98e979 100644
--- a/go/parquet/internal/encoding/types.go
+++ b/go/parquet/internal/encoding/types.go
@@ -339,6 +339,12 @@ func (b *BufferWriter) Finish() *memory.Buffer {
        return buf
 }
 
+// Release the underlying buffer and not allocate anything else. To re-use 
this buffer, Reset() or Finish() should be called
+func (b *BufferWriter) Release() {
+       b.buffer.Release()
+       b.buffer = nil
+}
+
 func (b *BufferWriter) Truncate() {
        b.pos = 0
        b.offset = 0
diff --git a/go/parquet/internal/testutils/random_arrow.go 
b/go/parquet/internal/testutils/random_arrow.go
index 719a9f5dd0..71be6c5d63 100644
--- a/go/parquet/internal/testutils/random_arrow.go
+++ b/go/parquet/internal/testutils/random_arrow.go
@@ -33,80 +33,80 @@ import (
 // binary will have each value between length 2 and 12 but random bytes that 
are not limited to ascii
 // fixed size binary will all be of length 10, random bytes are not limited to 
ascii
 // bool will be approximately half false and half true randomly.
-func RandomNonNull(dt arrow.DataType, size int) arrow.Array {
+func RandomNonNull(mem memory.Allocator, dt arrow.DataType, size int) 
arrow.Array {
        switch dt.ID() {
        case arrow.FLOAT32:
-               bldr := array.NewFloat32Builder(memory.DefaultAllocator)
+               bldr := array.NewFloat32Builder(mem)
                defer bldr.Release()
                values := make([]float32, size)
                FillRandomFloat32(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.FLOAT64:
-               bldr := array.NewFloat64Builder(memory.DefaultAllocator)
+               bldr := array.NewFloat64Builder(mem)
                defer bldr.Release()
                values := make([]float64, size)
                FillRandomFloat64(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.INT64:
-               bldr := array.NewInt64Builder(memory.DefaultAllocator)
+               bldr := array.NewInt64Builder(mem)
                defer bldr.Release()
                values := make([]int64, size)
                FillRandomInt64(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.UINT64:
-               bldr := array.NewUint64Builder(memory.DefaultAllocator)
+               bldr := array.NewUint64Builder(mem)
                defer bldr.Release()
                values := make([]uint64, size)
                FillRandomUint64(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.INT32:
-               bldr := array.NewInt32Builder(memory.DefaultAllocator)
+               bldr := array.NewInt32Builder(mem)
                defer bldr.Release()
                values := make([]int32, size)
                FillRandomInt32(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.UINT32:
-               bldr := array.NewUint32Builder(memory.DefaultAllocator)
+               bldr := array.NewUint32Builder(mem)
                defer bldr.Release()
                values := make([]uint32, size)
                FillRandomUint32(0, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.INT16:
-               bldr := array.NewInt16Builder(memory.DefaultAllocator)
+               bldr := array.NewInt16Builder(mem)
                defer bldr.Release()
                values := make([]int16, size)
                FillRandomInt16(0, 0, 64, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.UINT16:
-               bldr := array.NewUint16Builder(memory.DefaultAllocator)
+               bldr := array.NewUint16Builder(mem)
                defer bldr.Release()
                values := make([]uint16, size)
                FillRandomUint16(0, 0, 64, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.INT8:
-               bldr := array.NewInt8Builder(memory.DefaultAllocator)
+               bldr := array.NewInt8Builder(mem)
                defer bldr.Release()
                values := make([]int8, size)
                FillRandomInt8(0, 0, 64, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.UINT8:
-               bldr := array.NewUint8Builder(memory.DefaultAllocator)
+               bldr := array.NewUint8Builder(mem)
                defer bldr.Release()
                values := make([]uint8, size)
                FillRandomUint8(0, 0, 64, values)
                bldr.AppendValues(values, nil)
                return bldr.NewArray()
        case arrow.DATE32:
-               bldr := array.NewDate32Builder(memory.DefaultAllocator)
+               bldr := array.NewDate32Builder(mem)
                defer bldr.Release()
                values := make([]int32, size)
                FillRandomInt32Max(0, 24, values)
@@ -118,7 +118,7 @@ func RandomNonNull(dt arrow.DataType, size int) arrow.Array 
{
                bldr.AppendValues(dates, nil)
                return bldr.NewArray()
        case arrow.DATE64:
-               bldr := array.NewDate64Builder(memory.DefaultAllocator)
+               bldr := array.NewDate64Builder(mem)
                defer bldr.Release()
                values := make([]int64, size)
                FillRandomInt64Max(0, 24, values)
@@ -130,21 +130,21 @@ func RandomNonNull(dt arrow.DataType, size int) 
arrow.Array {
                bldr.AppendValues(dates, nil)
                return bldr.NewArray()
        case arrow.STRING:
-               bldr := array.NewStringBuilder(memory.DefaultAllocator)
+               bldr := array.NewStringBuilder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append("test-string")
                }
                return bldr.NewArray()
        case arrow.LARGE_STRING:
-               bldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+               bldr := array.NewLargeStringBuilder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append("test-large-string")
                }
                return bldr.NewArray()
        case arrow.BINARY, arrow.LARGE_BINARY:
-               bldr := array.NewBinaryBuilder(memory.DefaultAllocator, 
dt.(arrow.BinaryDataType))
+               bldr := array.NewBinaryBuilder(mem, dt.(arrow.BinaryDataType))
                defer bldr.Release()
 
                buf := make([]byte, 12)
@@ -156,7 +156,7 @@ func RandomNonNull(dt arrow.DataType, size int) arrow.Array 
{
                }
                return bldr.NewArray()
        case arrow.FIXED_SIZE_BINARY:
-               bldr := 
array.NewFixedSizeBinaryBuilder(memory.DefaultAllocator, 
&arrow.FixedSizeBinaryType{ByteWidth: 10})
+               bldr := array.NewFixedSizeBinaryBuilder(mem, 
&arrow.FixedSizeBinaryType{ByteWidth: 10})
                defer bldr.Release()
 
                buf := make([]byte, 10)
@@ -168,14 +168,14 @@ func RandomNonNull(dt arrow.DataType, size int) 
arrow.Array {
                return bldr.NewArray()
        case arrow.DECIMAL:
                dectype := dt.(*arrow.Decimal128Type)
-               bldr := array.NewDecimal128Builder(memory.DefaultAllocator, 
dectype)
+               bldr := array.NewDecimal128Builder(mem, dectype)
                defer bldr.Release()
 
                data := RandomDecimals(int64(size), 0, dectype.Precision)
                bldr.AppendValues(arrow.Decimal128Traits.CastFromBytes(data), 
nil)
                return bldr.NewArray()
        case arrow.BOOL:
-               bldr := array.NewBooleanBuilder(memory.DefaultAllocator)
+               bldr := array.NewBooleanBuilder(mem)
                defer bldr.Release()
 
                values := make([]bool, size)
diff --git a/go/parquet/pqarrow/column_readers.go 
b/go/parquet/pqarrow/column_readers.go
index e0179f8340..1bcaa7adf7 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -18,6 +18,7 @@ package pqarrow
 
 import (
        "encoding/binary"
+       "errors"
        "fmt"
        "reflect"
        "sync"
@@ -35,7 +36,6 @@ import (
        "github.com/apache/arrow/go/v12/parquet/file"
        "github.com/apache/arrow/go/v12/parquet/schema"
        "golang.org/x/sync/errgroup"
-       "golang.org/x/xerrors"
 )
 
 // column reader for leaf columns (non-nested)
@@ -201,7 +201,7 @@ func (sr *structReader) IsOrHasRepeatedChild() bool { 
return sr.hasRepeatedChild
 
 func (sr *structReader) GetDefLevels() ([]int16, error) {
        if len(sr.children) == 0 {
-               return nil, xerrors.New("struct raeder has no children")
+               return nil, errors.New("struct reader has no children")
        }
 
        // this method should only be called when this struct or one of its 
parents
@@ -212,7 +212,7 @@ func (sr *structReader) GetDefLevels() ([]int16, error) {
 
 func (sr *structReader) GetRepLevels() ([]int16, error) {
        if len(sr.children) == 0 {
-               return nil, xerrors.New("struct raeder has no children")
+               return nil, errors.New("struct reader has no children")
        }
 
        // this method should only be called when this struct or one of its 
parents
@@ -453,14 +453,19 @@ func chunksToSingle(chunked *arrow.Chunked) 
(arrow.ArrayData, error) {
        case 1:
                return chunked.Chunk(0).Data(), nil
        default: // if an item reader yields a chunked array, this is not yet 
implemented
-               return nil, xerrors.New("not implemented")
+               return nil, arrow.ErrNotImplemented
        }
 }
 
 // create a chunked arrow array from the raw record data
 func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr 
*schema.Column, mem memory.Allocator) (*arrow.Chunked, error) {
+       dt := valueType
+       if valueType.ID() == arrow.EXTENSION {
+               dt = valueType.(arrow.ExtensionType).StorageType()
+       }
+
        var data arrow.ArrayData
-       switch valueType.ID() {
+       switch dt.ID() {
        case arrow.DICTIONARY:
                return transferDictionary(rdr, valueType), nil
        case arrow.NULL:
@@ -490,7 +495,7 @@ func transferColumnData(rdr file.RecordReader, valueType 
arrow.DataType, descr *
                case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
                        return 
transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType)
                default:
-                       return nil, xerrors.New("physical type for decimal128 
must be int32, int64, bytearray or fixed len byte array")
+                       return nil, errors.New("physical type for decimal128 
must be int32, int64, bytearray or fixed len byte array")
                }
        case arrow.TIMESTAMP:
                tstype := valueType.(*arrow.TimestampType)
@@ -504,7 +509,7 @@ func transferColumnData(rdr file.RecordReader, valueType 
arrow.DataType, descr *
                                data = transferZeroCopy(rdr, valueType)
                        }
                default:
-                       return nil, xerrors.New("time unit not supported")
+                       return nil, errors.New("time unit not supported")
                }
        default:
                return nil, fmt.Errorf("no support for reading columns of type: 
%s", valueType.Name())
@@ -538,12 +543,23 @@ func transferBinary(rdr file.RecordReader, dt 
arrow.DataType) *arrow.Chunked {
                return transferDictionary(brdr, 
&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: dt})
        }
        chunks := brdr.GetBuilderChunks()
-       if dt == arrow.BinaryTypes.String || dt == 
arrow.BinaryTypes.LargeString {
-               // convert chunks from binary to string without copying data,
-               // just changing the interpretation of the metadata
+       switch {
+       case dt.ID() == arrow.EXTENSION:
+               etype := dt.(arrow.ExtensionType)
+               for idx, chk := range chunks {
+                       chunks[idx] = array.NewExtensionArrayWithStorage(etype, 
chk)
+                       chk.Release() // NewExtensionArrayWithStorage will call 
retain on chk, so it still needs to be released
+                       defer chunks[idx].Release()
+               }
+       case dt == arrow.BinaryTypes.String || dt == 
arrow.BinaryTypes.LargeString:
                for idx := range chunks {
+                       prev := chunks[idx]
                        chunks[idx] = array.MakeFromData(chunks[idx].Data())
-                       defer chunks[idx].Data().Release()
+                       prev.Release()
+                       defer chunks[idx].Release()
+               }
+       default:
+               for idx := range chunks {
                        defer chunks[idx].Release()
                }
        }
@@ -639,8 +655,10 @@ func transferBool(rdr file.RecordReader) arrow.ArrayData {
        if bitmap != nil {
                defer bitmap.Release()
        }
+       bb := memory.NewBufferBytes(data)
+       defer bb.Release()
        return array.NewData(&arrow.BooleanType{}, length, []*memory.Buffer{
-               bitmap, memory.NewBufferBytes(data),
+               bitmap, bb,
        }, nil, int(rdr.NullCount()), 0)
 }
 
diff --git a/go/parquet/pqarrow/encode_arrow.go 
b/go/parquet/pqarrow/encode_arrow.go
index a9b9e0b16e..a467686720 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -19,6 +19,7 @@ package pqarrow
 import (
        "context"
        "encoding/binary"
+       "errors"
        "fmt"
        "time"
        "unsafe"
@@ -31,13 +32,14 @@ import (
        "github.com/apache/arrow/go/v12/internal/utils"
        "github.com/apache/arrow/go/v12/parquet"
        "github.com/apache/arrow/go/v12/parquet/file"
-       "golang.org/x/xerrors"
 )
 
 // get the count of the number of leaf arrays for the type
 func calcLeafCount(dt arrow.DataType) int {
        switch dt.ID() {
-       case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION:
+       case arrow.EXTENSION:
+               return calcLeafCount(dt.(arrow.ExtensionType).StorageType())
+       case arrow.SPARSE_UNION, arrow.DENSE_UNION:
                panic("arrow type not implemented")
        case arrow.DICTIONARY:
                return calcLeafCount(dt.(*arrow.DictionaryType).ValueType)
@@ -112,7 +114,7 @@ func NewArrowColumnWriter(data *arrow.Chunked, offset, size 
int64, manifest *Sch
        }
 
        if absPos >= int64(data.Len()) {
-               return ArrowColumnWriter{}, xerrors.New("cannot write data at 
offset past end of chunked array")
+               return ArrowColumnWriter{}, errors.New("cannot write data at 
offset past end of chunked array")
        }
 
        leafCount := calcLeafCount(data.DataType())
@@ -188,7 +190,7 @@ func (acw *ArrowColumnWriter) Write(ctx context.Context) 
error {
                        defer res.Release()
 
                        if len(res.postListVisitedElems) != 1 {
-                               return xerrors.New("lists with non-zero length 
null components are not supported")
+                               return errors.New("lists with non-zero length 
null components are not supported")
                        }
                        rng := res.postListVisitedElems[0]
                        values := array.NewSlice(res.leafArr, rng.start, 
rng.end)
@@ -221,10 +223,18 @@ func WriteArrowToColumn(ctx context.Context, cw 
file.ColumnChunkWriter, leafArr
                cw.SetBitsBuffer(buf)
        }
 
+       arrCtx := arrowCtxFromContext(ctx)
+       defer func() {
+               if arrCtx.dataBuffer != nil {
+                       arrCtx.dataBuffer.Release()
+                       arrCtx.dataBuffer = nil
+               }
+       }()
+
        if leafArr.DataType().ID() == arrow.DICTIONARY {
-               return writeDictionaryArrow(arrowCtxFromContext(ctx), cw, 
leafArr, defLevels, repLevels, maybeParentNulls)
+               return writeDictionaryArrow(arrCtx, cw, leafArr, defLevels, 
repLevels, maybeParentNulls)
        }
-       return writeDenseArrow(arrowCtxFromContext(ctx), cw, leafArr, 
defLevels, repLevels, maybeParentNulls)
+       return writeDenseArrow(arrCtx, cw, leafArr, defLevels, repLevels, 
maybeParentNulls)
 }
 
 type binaryarr interface {
@@ -236,6 +246,12 @@ type binary64arr interface {
 }
 
 func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, 
leafArr arrow.Array, defLevels, repLevels []int16, maybeParentNulls bool) (err 
error) {
+       if leafArr.DataType().ID() == arrow.EXTENSION {
+               extensionArray := leafArr.(array.ExtensionArray)
+               // Replace leafArr with its underlying storage array
+               leafArr = extensionArray.Storage()
+       }
+
        noNulls := cw.Descr().SchemaNode().RepetitionType() == 
parquet.Repetitions.Required || leafArr.NullN() == 0
 
        if ctx.dataBuffer == nil {
@@ -250,7 +266,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                // TODO(mtopol): optimize this so that we aren't converting from
                // the bitmap -> []bool -> bitmap anymore
                if leafArr.Len() == 0 {
-                       wr.WriteBatch(nil, defLevels, repLevels)
+                       _, err = wr.WriteBatch(nil, defLevels, repLevels)
                        break
                }
 
@@ -321,7 +337,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                }
 
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(data, defLevels, repLevels)
+                       _, err = wr.WriteBatch(data, defLevels, repLevels)
                } else {
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
@@ -346,7 +362,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                                }
                        } else if (cw.Properties().Version() == parquet.V1_0 || 
cw.Properties().Version() == parquet.V2_4) && tstype.Unit == arrow.Nanosecond {
                                // absent superceding user instructions, when 
writing a Parquet Version <=2.4 File,
-                               // timestamps in nano seconds are coerced to 
microseconds
+                               // timestamps in nanoseconds are coerced to 
microseconds
                                
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
                                data = 
arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
                                p := 
NewArrowWriterProperties(WithCoerceTimestamps(arrow.Microsecond), 
WithTruncatedTimestamps(true))
@@ -383,14 +399,14 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                }
 
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(data, defLevels, repLevels)
+                       _, err = wr.WriteBatch(data, defLevels, repLevels)
                } else {
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
                }
        case *file.Int96ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.TIMESTAMP {
-                       return xerrors.New("unsupported arrow type to write to 
Int96 column")
+                       return errors.New("unsupported arrow type to write to 
Int96 column")
                }
                
ctx.dataBuffer.ResizeNoShrink(parquet.Int96Traits.BytesRequired(leafArr.Len()))
                data := 
parquet.Int96Traits.CastFromBytes(ctx.dataBuffer.Bytes())
@@ -401,26 +417,26 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                }
 
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(data, defLevels, repLevels)
+                       _, err = wr.WriteBatch(data, defLevels, repLevels)
                } else {
                        nulls := leafArr.NullBitmapBytes()
                        wr.WriteBatchSpaced(data, defLevels, repLevels, nulls, 
int64(leafArr.Data().Offset()))
                }
        case *file.Float32ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.FLOAT32 {
-                       return xerrors.New("invalid column type to write to 
Float")
+                       return errors.New("invalid column type to write to 
Float")
                }
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), 
defLevels, repLevels)
+                       _, err = 
wr.WriteBatch(leafArr.(*array.Float32).Float32Values(), defLevels, repLevels)
                } else {
                        
wr.WriteBatchSpaced(leafArr.(*array.Float32).Float32Values(), defLevels, 
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
        case *file.Float64ColumnChunkWriter:
                if leafArr.DataType().ID() != arrow.FLOAT64 {
-                       return xerrors.New("invalid column type to write to 
Float")
+                       return errors.New("invalid column type to write to 
Float")
                }
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), 
defLevels, repLevels)
+                       _, err = 
wr.WriteBatch(leafArr.(*array.Float64).Float64Values(), defLevels, repLevels)
                } else {
                        
wr.WriteBatchSpaced(leafArr.(*array.Float64).Float64Values(), defLevels, 
repLevels, leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
@@ -449,11 +465,11 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                                data[i] = 
parquet.ByteArray(valueBuf[offsets[i]:offsets[i+1]])
                        }
                default:
-                       return xerrors.New(fmt.Sprintf("invalid column type to 
write to ByteArray: %s", leafArr.DataType().Name()))
+                       return fmt.Errorf("%w: invalid column type to write to 
ByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
                }
 
                if !maybeParentNulls && noNulls {
-                       wr.WriteBatch(data, defLevels, repLevels)
+                       _, err = wr.WriteBatch(data, defLevels, repLevels)
                } else {
                        wr.WriteBatchSpaced(data, defLevels, repLevels, 
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                }
@@ -466,7 +482,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                                data[idx] = 
leafArr.(*array.FixedSizeBinary).Value(idx)
                        }
                        if !maybeParentNulls && noNulls {
-                               wr.WriteBatch(data, defLevels, repLevels)
+                               _, err = wr.WriteBatch(data, defLevels, 
repLevels)
                        } else {
                                wr.WriteBatchSpaced(data, defLevels, repLevels, 
leafArr.NullBitmapBytes(), int64(leafArr.Data().Offset()))
                        }
@@ -492,7 +508,7 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                                for idx := range data {
                                        data[idx] = 
fixDecimalEndianness(arr.Value(idx))
                                }
-                               wr.WriteBatch(data, defLevels, repLevels)
+                               _, err = wr.WriteBatch(data, defLevels, 
repLevels)
                        } else {
                                for idx := range data {
                                        if arr.IsValid(idx) {
@@ -502,10 +518,10 @@ func writeDenseArrow(ctx *arrowWriteContext, cw 
file.ColumnChunkWriter, leafArr
                                wr.WriteBatchSpaced(data, defLevels, repLevels, 
arr.NullBitmapBytes(), int64(arr.Data().Offset()))
                        }
                default:
-                       return xerrors.New("unimplemented")
+                       return fmt.Errorf("%w: invalid column type to write to 
FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
                }
        default:
-               return xerrors.New("unknown column writer physical type")
+               return errors.New("unknown column writer physical type")
        }
        return
 }
diff --git a/go/parquet/pqarrow/encode_arrow_test.go 
b/go/parquet/pqarrow/encode_arrow_test.go
index 9eea5b1706..ded398979a 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -30,8 +30,10 @@ import (
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/bitutil"
        "github.com/apache/arrow/go/v12/arrow/decimal128"
+       "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/apache/arrow/go/v12/internal/bitutils"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/apache/arrow/go/v12/internal/utils"
        "github.com/apache/arrow/go/v12/parquet"
        "github.com/apache/arrow/go/v12/parquet/compress"
@@ -40,6 +42,7 @@ import (
        "github.com/apache/arrow/go/v12/parquet/internal/testutils"
        "github.com/apache/arrow/go/v12/parquet/pqarrow"
        "github.com/apache/arrow/go/v12/parquet/schema"
+       "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
@@ -136,7 +139,8 @@ func TestWriteArrowCols(t *testing.T) {
        manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
        require.NoError(t, err)
 
-       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+       sink := encoding.NewBufferWriter(0, mem)
+       defer sink.Release()
        writer := file.NewParquetWriter(sink, psc.Root(), 
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4))))
 
        srgw := writer.AppendRowGroup()
@@ -230,15 +234,18 @@ func TestWriteArrowInt96(t *testing.T) {
        tbl := makeDateTimeTypesTable(mem, false, false)
        defer tbl.Release()
 
-       props := 
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true))
+       props := 
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true), 
pqarrow.WithAllocator(mem))
+
        psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props)
        require.NoError(t, err)
 
        manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil)
        require.NoError(t, err)
 
-       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
-       writer := file.NewParquetWriter(sink, psc.Root())
+       sink := encoding.NewBufferWriter(0, mem)
+       defer sink.Release()
+
+       writer := file.NewParquetWriter(sink, psc.Root(), 
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
 
        srgw := writer.AppendRowGroup()
        ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
@@ -283,8 +290,9 @@ func TestWriteArrowInt96(t *testing.T) {
        assert.EqualValues(t, data.Value(5), vals[4].ToTime().UnixNano())
 }
 
-func writeTableToBuffer(t *testing.T, tbl arrow.Table, rowGroupSize int64, 
props pqarrow.ArrowWriterProperties) *memory.Buffer {
-       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+func writeTableToBuffer(t *testing.T, mem memory.Allocator, tbl arrow.Table, 
rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer {
+       sink := encoding.NewBufferWriter(0, mem)
+       defer sink.Release()
        wrprops := 
parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
        psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props)
        require.NoError(t, err)
@@ -314,13 +322,16 @@ func writeTableToBuffer(t *testing.T, tbl arrow.Table, 
rowGroupSize int64, props
 }
 
 func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) {
-       buf := writeTableToBuffer(t, tbl, rowGroupSize, 
pqarrow.DefaultWriterProps())
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       buf := writeTableToBuffer(t, mem, tbl, rowGroupSize, 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        defer buf.Release()
 
        rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
        require.NoError(t, err)
 
-       ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, 
memory.DefaultAllocator)
+       ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, 
mem)
        require.NoError(t, err)
 
        for i := 0; i < int(tbl.NumCols()); i++ {
@@ -344,6 +355,7 @@ func simpleRoundTrip(t *testing.T, tbl arrow.Table, 
rowGroupSize int64) {
                                assert.True(t, array.Equal(chnk, slc.Chunk(0)))
                        }
                }
+               crdr.Release()
        }
 }
 
@@ -361,11 +373,16 @@ func TestArrowReadWriteTableChunkedCols(t *testing.T) {
        for _, chnksize := range chunkSizes {
                chk := array.NewSlice(arr, offset, offset+int64(chnksize))
                defer chk.Release()
+               defer chk.Release() // for NewChunked below
                chunks = append(chunks, chk)
        }
 
        sc := arrow.NewSchema([]arrow.Field{{Name: "field", Type: 
arr.DataType(), Nullable: true}}, nil)
-       tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0), 
arrow.NewChunked(arr.DataType(), chunks))}, -1)
+
+       chk := arrow.NewChunked(arr.DataType(), chunks)
+       defer chk.Release()
+
+       tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0), 
chk)}, -1)
        defer tbl.Release()
 
        simpleRoundTrip(t, tbl, 2)
@@ -487,6 +504,16 @@ type ParquetIOTestSuite struct {
        suite.Suite
 }
 
+func (ps *ParquetIOTestSuite) SetupTest() {
+       ps.NoError(arrow.RegisterExtensionType(types.NewUUIDType()))
+}
+
+func (ps *ParquetIOTestSuite) TearDownTest() {
+       if arrow.GetExtensionType("uuid") != nil {
+               ps.NoError(arrow.UnregisterExtensionType("uuid"))
+       }
+}
+
 func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep 
parquet.Repetition) *schema.GroupNode {
        byteWidth := int32(-1)
 
@@ -509,80 +536,80 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ 
arrow.DataType, rep parquet.R
        return schema.MustGroup(schema.NewGroupNode("schema", 
parquet.Repetitions.Required, schema.FieldList{pnode}, -1))
 }
 
-func (ps *ParquetIOTestSuite) makePrimitiveTestCol(size int, typ 
arrow.DataType) arrow.Array {
+func (ps *ParquetIOTestSuite) makePrimitiveTestCol(mem memory.Allocator, size 
int, typ arrow.DataType) arrow.Array {
        switch typ.ID() {
        case arrow.BOOL:
-               bldr := array.NewBooleanBuilder(memory.DefaultAllocator)
+               bldr := array.NewBooleanBuilder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(boolTestValue)
                }
                return bldr.NewArray()
        case arrow.INT8:
-               bldr := array.NewInt8Builder(memory.DefaultAllocator)
+               bldr := array.NewInt8Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(int8TestVal)
                }
                return bldr.NewArray()
        case arrow.UINT8:
-               bldr := array.NewUint8Builder(memory.DefaultAllocator)
+               bldr := array.NewUint8Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(uint8TestVal)
                }
                return bldr.NewArray()
        case arrow.INT16:
-               bldr := array.NewInt16Builder(memory.DefaultAllocator)
+               bldr := array.NewInt16Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(int16TestVal)
                }
                return bldr.NewArray()
        case arrow.UINT16:
-               bldr := array.NewUint16Builder(memory.DefaultAllocator)
+               bldr := array.NewUint16Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(uint16TestVal)
                }
                return bldr.NewArray()
        case arrow.INT32:
-               bldr := array.NewInt32Builder(memory.DefaultAllocator)
+               bldr := array.NewInt32Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(int32TestVal)
                }
                return bldr.NewArray()
        case arrow.UINT32:
-               bldr := array.NewUint32Builder(memory.DefaultAllocator)
+               bldr := array.NewUint32Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(uint32TestVal)
                }
                return bldr.NewArray()
        case arrow.INT64:
-               bldr := array.NewInt64Builder(memory.DefaultAllocator)
+               bldr := array.NewInt64Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(int64TestVal)
                }
                return bldr.NewArray()
        case arrow.UINT64:
-               bldr := array.NewUint64Builder(memory.DefaultAllocator)
+               bldr := array.NewUint64Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(uint64TestVal)
                }
                return bldr.NewArray()
        case arrow.FLOAT32:
-               bldr := array.NewFloat32Builder(memory.DefaultAllocator)
+               bldr := array.NewFloat32Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(floatTestVal)
                }
                return bldr.NewArray()
        case arrow.FLOAT64:
-               bldr := array.NewFloat64Builder(memory.DefaultAllocator)
+               bldr := array.NewFloat64Builder(mem)
                defer bldr.Release()
                for i := 0; i < size; i++ {
                        bldr.Append(doubleTestVal)
@@ -592,12 +619,14 @@ func (ps *ParquetIOTestSuite) makePrimitiveTestCol(size 
int, typ arrow.DataType)
        return nil
 }
 
-func (ps *ParquetIOTestSuite) makeTestFile(typ arrow.DataType, arr 
arrow.Array, numChunks int) []byte {
+func (ps *ParquetIOTestSuite) makeTestFile(mem memory.Allocator, typ 
arrow.DataType, arr arrow.Array, numChunks int) []byte {
        sc := ps.makeSimpleSchema(typ, parquet.Repetitions.Required)
-       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
-       writer := file.NewParquetWriter(sink, sc)
+       sink := encoding.NewBufferWriter(0, mem)
+       defer sink.Release()
+       writer := file.NewParquetWriter(sink, sc, 
file.WithWriterProps(parquet.NewWriterProperties(parquet.WithAllocator(mem))))
 
-       ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil)
+       props := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
+       ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props)
        rowGroupSize := arr.Len() / numChunks
 
        for i := 0; i < numChunks; i++ {
@@ -606,21 +635,23 @@ func (ps *ParquetIOTestSuite) makeTestFile(typ 
arrow.DataType, arr arrow.Array,
                ps.NoError(err)
 
                start := i * rowGroupSize
-               ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw, 
array.NewSlice(arr, int64(start), int64(start+rowGroupSize)), nil, nil, false))
-               cw.Close()
-               rgw.Close()
+               slc := array.NewSlice(arr, int64(start), 
int64(start+rowGroupSize))
+               defer slc.Release()
+               ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw, slc, nil, nil, 
false))
+               ps.NoError(cw.Close())
+               ps.NoError(rgw.Close())
        }
-       writer.Close()
+       ps.NoError(writer.Close())
        buf := sink.Finish()
        defer buf.Release()
        return buf.Bytes()
 }
 
-func (ps *ParquetIOTestSuite) createReader(data []byte) *pqarrow.FileReader {
-       rdr, err := file.NewParquetReader(bytes.NewReader(data))
+func (ps *ParquetIOTestSuite) createReader(mem memory.Allocator, data []byte) 
*pqarrow.FileReader {
+       rdr, err := file.NewParquetReader(bytes.NewReader(data), 
file.WithReadProps(parquet.NewReaderProperties(mem)))
        ps.NoError(err)
 
-       reader, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
+       reader, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, mem)
        ps.NoError(err)
        return reader
 }
@@ -632,12 +663,12 @@ func (ps *ParquetIOTestSuite) readTable(rdr 
*pqarrow.FileReader) arrow.Table {
        return tbl
 }
 
-func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(typ 
arrow.DataType, numChunks int) {
-       values := ps.makePrimitiveTestCol(smallSize, typ)
+func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(mem 
memory.Allocator, typ arrow.DataType, numChunks int) {
+       values := ps.makePrimitiveTestCol(mem, smallSize, typ)
        defer values.Release()
 
-       data := ps.makeTestFile(typ, values, numChunks)
-       reader := ps.createReader(data)
+       data := ps.makeTestFile(mem, typ, values, numChunks)
+       reader := ps.createReader(mem, data)
 
        tbl := ps.readTable(reader)
        defer tbl.Release()
@@ -650,19 +681,19 @@ func (ps *ParquetIOTestSuite) 
checkSingleColumnRequiredTableRead(typ arrow.DataT
        ps.True(array.Equal(values, chunked.Chunk(0)))
 }
 
-func (ps *ParquetIOTestSuite) checkSingleColumnRead(typ arrow.DataType, 
numChunks int) {
-       values := ps.makePrimitiveTestCol(smallSize, typ)
+func (ps *ParquetIOTestSuite) checkSingleColumnRead(mem memory.Allocator, typ 
arrow.DataType, numChunks int) {
+       values := ps.makePrimitiveTestCol(mem, smallSize, typ)
        defer values.Release()
 
-       data := ps.makeTestFile(typ, values, numChunks)
-       reader := ps.createReader(data)
+       data := ps.makeTestFile(mem, typ, values, numChunks)
+       reader := ps.createReader(mem, data)
 
        cr, err := reader.GetColumn(context.TODO(), 0)
        ps.NoError(err)
+       defer cr.Release()
 
        chunked, err := cr.NextBatch(smallSize)
        ps.NoError(err)
-       defer chunked.Release()
 
        ps.Len(chunked.Chunks(), 1)
        ps.True(array.Equal(values, chunked.Chunk(0)))
@@ -674,10 +705,10 @@ func (ps *ParquetIOTestSuite) 
TestDateTimeTypesReadWriteTable() {
 
        toWrite := makeDateTimeTypesTable(mem, false, true)
        defer toWrite.Release()
-       buf := writeTableToBuffer(ps.T(), toWrite, toWrite.NumRows(), 
pqarrow.DefaultWriterProps())
+       buf := writeTableToBuffer(ps.T(), mem, toWrite, toWrite.NumRows(), 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        defer buf.Release()
 
-       reader := ps.createReader(buf.Bytes())
+       reader := ps.createReader(mem, buf.Bytes())
        tbl := ps.readTable(reader)
        defer tbl.Release()
 
@@ -703,10 +734,10 @@ func (ps *ParquetIOTestSuite) 
TestDateTimeTypesWithInt96ReadWriteTable() {
 
        expected := makeDateTimeTypesTable(mem, false, true)
        defer expected.Release()
-       buf := writeTableToBuffer(ps.T(), expected, expected.NumRows(), 
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
+       buf := writeTableToBuffer(ps.T(), mem, expected, expected.NumRows(), 
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
        defer buf.Release()
 
-       reader := ps.createReader(buf.Bytes())
+       reader := ps.createReader(mem, buf.Bytes())
        tbl := ps.readTable(reader)
        defer tbl.Release()
 
@@ -729,9 +760,9 @@ func (ps *ParquetIOTestSuite) 
TestLargeBinaryReadWriteTable() {
 
        // While we may write using LargeString, when we read, we get an 
array.String back out.
        // So we're building a normal array.String to use with array.Equal
-       lsBldr := array.NewLargeStringBuilder(memory.DefaultAllocator)
+       lsBldr := array.NewLargeStringBuilder(mem)
        defer lsBldr.Release()
-       lbBldr := array.NewBinaryBuilder(memory.DefaultAllocator, 
arrow.BinaryTypes.LargeBinary)
+       lbBldr := array.NewBinaryBuilder(mem, arrow.BinaryTypes.LargeBinary)
        defer lbBldr.Release()
 
        for i := 0; i < smallSize; i++ {
@@ -755,8 +786,10 @@ func (ps *ParquetIOTestSuite) 
TestLargeBinaryReadWriteTable() {
                },
                -1,
        )
+       defer lsValues.Release() // NewChunked
+       defer lbValues.Release() // NewChunked
        defer expected.Release()
-       ps.roundTripTable(expected, true)
+       ps.roundTripTable(mem, expected, true)
 }
 
 func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
@@ -779,7 +812,9 @@ func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() {
        for _, n := range nchunks {
                for _, dt := range types {
                        ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), 
func() {
-                               ps.checkSingleColumnRead(dt, n)
+                               mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                               defer mem.AssertSize(ps.T(), 0)
+                               ps.checkSingleColumnRead(mem, dt, n)
                        })
                }
        }
@@ -805,13 +840,19 @@ func (ps *ParquetIOTestSuite) 
TestSingleColumnRequiredRead() {
        for _, n := range nchunks {
                for _, dt := range types {
                        ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), 
func() {
-                               ps.checkSingleColumnRequiredTableRead(dt, n)
+                               mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                               defer mem.AssertSize(ps.T(), 0)
+
+                               ps.checkSingleColumnRequiredTableRead(mem, dt, 
n)
                        })
                }
        }
 }
 
 func (ps *ParquetIOTestSuite) TestReadDecimals() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        bigEndian := []parquet.ByteArray{
                // 123456
                []byte{1, 226, 64},
@@ -821,7 +862,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
                []byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 
255, 255, 254, 29, 192},
        }
 
-       bldr := array.NewDecimal128Builder(memory.DefaultAllocator, 
&arrow.Decimal128Type{Precision: 6, Scale: 3})
+       bldr := array.NewDecimal128Builder(mem, 
&arrow.Decimal128Type{Precision: 6, Scale: 3})
        defer bldr.Release()
 
        bldr.Append(decimal128.FromU64(123456))
@@ -835,7 +876,8 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
                schema.Must(schema.NewPrimitiveNodeLogical("decimals", 
parquet.Repetitions.Required, schema.NewDecimalLogicalType(6, 3), 
parquet.Types.ByteArray, -1, -1)),
        }, -1))
 
-       sink := encoding.NewBufferWriter(0, memory.DefaultAllocator)
+       sink := encoding.NewBufferWriter(0, mem)
+       defer sink.Release()
        writer := file.NewParquetWriter(sink, sc)
 
        rgw := writer.AppendRowGroup()
@@ -845,7 +887,7 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
        rgw.Close()
        writer.Close()
 
-       rdr := ps.createReader(sink.Bytes())
+       rdr := ps.createReader(mem, sink.Bytes())
        cr, err := rdr.GetColumn(context.TODO(), 0)
        ps.NoError(err)
 
@@ -857,31 +899,32 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
        ps.True(array.Equal(expected, chunked.Chunk(0)))
 }
 
-func (ps *ParquetIOTestSuite) writeColumn(sc *schema.GroupNode, values 
arrow.Array) []byte {
+func (ps *ParquetIOTestSuite) writeColumn(mem memory.Allocator, sc 
*schema.GroupNode, values arrow.Array) []byte {
        var buf bytes.Buffer
        arrsc, err := pqarrow.FromParquet(schema.NewSchema(sc), nil, nil)
        ps.NoError(err)
 
-       writer, err := pqarrow.NewFileWriter(arrsc, &buf, 
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)), 
pqarrow.DefaultWriterProps())
+       writer, err := pqarrow.NewFileWriter(arrsc, &buf, 
parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)), 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        ps.NoError(err)
 
        writer.NewRowGroup()
        ps.NoError(writer.WriteColumnData(values))
+       //defer values.Release()
        ps.NoError(writer.Close())
        ps.NoError(writer.Close())
 
        return buf.Bytes()
 }
 
-func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(data []byte, values 
arrow.Array) {
-       reader := ps.createReader(data)
+func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(mem 
memory.Allocator, data []byte, values arrow.Array) {
+       reader := ps.createReader(mem, data)
        cr, err := reader.GetColumn(context.TODO(), 0)
        ps.NoError(err)
        ps.NotNil(cr)
+       defer cr.Release()
 
        chunked, err := cr.NextBatch(smallSize)
        ps.NoError(err)
-       defer chunked.Release()
 
        ps.Len(chunked.Chunks(), 1)
        ps.NotNil(chunked.Chunk(0))
@@ -917,17 +960,21 @@ var fullTypeList = []arrow.DataType{
 func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() {
        for _, dt := range fullTypeList {
                ps.Run(dt.Name(), func() {
-                       values := testutils.RandomNonNull(dt, smallSize)
+                       mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                       defer mem.AssertSize(ps.T(), 0)
+
+                       values := testutils.RandomNonNull(mem, dt, smallSize)
+                       defer values.Release()
                        sc := ps.makeSimpleSchema(dt, 
parquet.Repetitions.Required)
-                       data := ps.writeColumn(sc, values)
-                       ps.readAndCheckSingleColumnFile(data, values)
+                       data := ps.writeColumn(mem, sc, values)
+                       ps.readAndCheckSingleColumnFile(mem, data, values)
                })
        }
 }
 
-func (ps *ParquetIOTestSuite) roundTripTable(expected arrow.Table, storeSchema 
bool) {
-       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
-       defer mem.AssertSize(ps.T(), 0)
+func (ps *ParquetIOTestSuite) roundTripTable(_ memory.Allocator, expected 
arrow.Table, storeSchema bool) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator) // FIXME: 
currently overriding allocator to isolate leaks between roundTripTable and 
caller
+       //defer mem.AssertSize(ps.T(), 0)                            // FIXME: 
known leak
 
        var buf bytes.Buffer
        var props pqarrow.ArrowWriterProperties
@@ -940,7 +987,7 @@ func (ps *ParquetIOTestSuite) roundTripTable(expected 
arrow.Table, storeSchema b
        writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
        ps.Require().NoError(pqarrow.WriteTable(expected, &buf, 
expected.NumRows(), writeProps, props))
 
-       reader := ps.createReader(buf.Bytes())
+       reader := ps.createReader(mem, buf.Bytes())
        defer reader.ParquetReader().Close()
 
        tbl := ps.readTable(reader)
@@ -954,7 +1001,9 @@ func (ps *ParquetIOTestSuite) roundTripTable(expected 
arrow.Table, storeSchema b
 
        ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
        if exChunk.DataType().ID() != arrow.STRUCT {
-               ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), 
"expected: %s\ngot: %s", exChunk.Chunk(0), tblChunk.Chunk(0))
+               exc := exChunk.Chunk(0)
+               tbc := tblChunk.Chunk(0)
+               ps.Truef(array.Equal(exc, tbc), "expected: %T %s\ngot: %T %s", 
exc, exc, tbc, tbc)
        } else {
                // current impl of ArrayEquals for structs doesn't correctly 
handle nulls in the parent
                // with a non-nullable child when comparing. Since after the 
round trip, the data in the
@@ -1082,11 +1131,15 @@ func prepareListOfListTable(dt arrow.DataType, size, 
nullCount int, nullablePare
 }
 
 func (ps *ParquetIOTestSuite) TestSingleEmptyListsColumnReadWrite() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       //defer mem.AssertSize(ps.T(), 0) // FIXME: known leak
+
        expected := prepareEmptyListsTable(smallSize)
-       buf := writeTableToBuffer(ps.T(), expected, smallSize, 
pqarrow.DefaultWriterProps())
+       defer expected.Release()
+       buf := writeTableToBuffer(ps.T(), mem, expected, smallSize, 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        defer buf.Release()
 
-       reader := ps.createReader(buf.Bytes())
+       reader := ps.createReader(mem, buf.Bytes())
        tbl := ps.readTable(reader)
        defer tbl.Release()
 
@@ -1103,10 +1156,14 @@ func (ps *ParquetIOTestSuite) 
TestSingleEmptyListsColumnReadWrite() {
 func (ps *ParquetIOTestSuite) TestSingleColumnOptionalReadWrite() {
        for _, dt := range fullTypeList {
                ps.Run(dt.Name(), func() {
+                       mem := 
memory.NewCheckedAllocator(memory.DefaultAllocator)
+                       defer mem.AssertSize(ps.T(), 0)
+
                        values := testutils.RandomNullable(dt, smallSize, 10)
+                       defer values.Release()
                        sc := ps.makeSimpleSchema(dt, 
parquet.Repetitions.Optional)
-                       data := ps.writeColumn(sc, values)
-                       ps.readAndCheckSingleColumnFile(data, values)
+                       data := ps.writeColumn(mem, sc, values)
+                       ps.readAndCheckSingleColumnFile(mem, data, values)
                })
        }
 }
@@ -1116,7 +1173,7 @@ func (ps *ParquetIOTestSuite) 
TestSingleNullableListNullableColumnReadWrite() {
                ps.Run(dt.Name(), func() {
                        expected := prepareListTable(dt, smallSize, true, true, 
10)
                        defer expected.Release()
-                       ps.roundTripTable(expected, false)
+                       ps.roundTripTable(memory.DefaultAllocator, expected, 
false)
                })
        }
 }
@@ -1126,7 +1183,7 @@ func (ps *ParquetIOTestSuite) 
TestSingleRequiredListNullableColumnReadWrite() {
                ps.Run(dt.Name(), func() {
                        expected := prepareListTable(dt, smallSize, false, 
true, 10)
                        defer expected.Release()
-                       ps.roundTripTable(expected, false)
+                       ps.roundTripTable(memory.DefaultAllocator, expected, 
false)
                })
        }
 }
@@ -1136,7 +1193,7 @@ func (ps *ParquetIOTestSuite) 
TestSingleNullableListRequiredColumnReadWrite() {
                ps.Run(dt.Name(), func() {
                        expected := prepareListTable(dt, smallSize, true, 
false, 10)
                        defer expected.Release()
-                       ps.roundTripTable(expected, false)
+                       ps.roundTripTable(memory.DefaultAllocator, expected, 
false)
                })
        }
 }
@@ -1146,7 +1203,7 @@ func (ps *ParquetIOTestSuite) 
TestSingleRequiredListRequiredColumnReadWrite() {
                ps.Run(dt.Name(), func() {
                        expected := prepareListTable(dt, smallSize, false, 
false, 0)
                        defer expected.Release()
-                       ps.roundTripTable(expected, false)
+                       ps.roundTripTable(memory.DefaultAllocator, expected, 
false)
                })
        }
 }
@@ -1156,16 +1213,19 @@ func (ps *ParquetIOTestSuite) 
TestSingleNullableListRequiredListRequiredColumnRe
                ps.Run(dt.Name(), func() {
                        expected := prepareListOfListTable(dt, smallSize, 2, 
true, false, false)
                        defer expected.Release()
-                       ps.roundTripTable(expected, false)
+                       ps.roundTripTable(memory.DefaultAllocator, expected, 
false)
                })
        }
 }
 
 func (ps *ParquetIOTestSuite) TestSimpleStruct() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        links := arrow.StructOf(arrow.Field{Name: "Backward", Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
                arrow.Field{Name: "Forward", Type: arrow.PrimitiveTypes.Int64, 
Nullable: true})
 
-       bldr := array.NewStructBuilder(memory.DefaultAllocator, links)
+       bldr := array.NewStructBuilder(mem, links)
        defer bldr.Release()
 
        backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
@@ -1184,14 +1244,18 @@ func (ps *ParquetIOTestSuite) TestSimpleStruct() {
 
        tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", 
Type: links}}, nil),
                []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", 
Type: links}, arrow.NewChunked(links, []arrow.Array{data}))}, -1)
+       defer data.Release() // NewChunked
        defer tbl.Release()
 
-       ps.roundTripTable(tbl, false)
+       ps.roundTripTable(mem, tbl, false)
 }
 
 func (ps *ParquetIOTestSuite) TestSingleColumnNullableStruct() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        links := arrow.StructOf(arrow.Field{Name: "Backward", Type: 
arrow.PrimitiveTypes.Int64, Nullable: true})
-       bldr := array.NewStructBuilder(memory.DefaultAllocator, links)
+       bldr := array.NewStructBuilder(mem, links)
        defer bldr.Release()
 
        backBldr := bldr.FieldBuilder(0).(*array.Int64Builder)
@@ -1205,14 +1269,18 @@ func (ps *ParquetIOTestSuite) 
TestSingleColumnNullableStruct() {
 
        tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", 
Type: links, Nullable: true}}, nil),
                []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", 
Type: links, Nullable: true}, arrow.NewChunked(links, []arrow.Array{data}))}, 
-1)
+       defer data.Release() // NewChunked
        defer tbl.Release()
 
-       ps.roundTripTable(tbl, false)
+       ps.roundTripTable(mem, tbl, false)
 }
 
 func (ps *ParquetIOTestSuite) TestNestedRequiredFieldStruct() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        intField := arrow.Field{Name: "int_array", Type: 
arrow.PrimitiveTypes.Int32}
-       intBldr := array.NewInt32Builder(memory.DefaultAllocator)
+       intBldr := array.NewInt32Builder(mem)
        defer intBldr.Release()
        intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, nil)
 
@@ -1231,14 +1299,18 @@ func (ps *ParquetIOTestSuite) 
TestNestedRequiredFieldStruct() {
        tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
                []arrow.Column{*arrow.NewColumn(structField,
                        arrow.NewChunked(structField.Type, 
[]arrow.Array{stData}))}, -1)
+       defer stData.Release() // NewChunked
        defer tbl.Release()
 
-       ps.roundTripTable(tbl, false)
+       ps.roundTripTable(mem, tbl, false)
 }
 
 func (ps *ParquetIOTestSuite) TestNestedNullableField() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        intField := arrow.Field{Name: "int_array", Type: 
arrow.PrimitiveTypes.Int32, Nullable: true}
-       intBldr := array.NewInt32Builder(memory.DefaultAllocator)
+       intBldr := array.NewInt32Builder(mem)
        defer intBldr.Release()
        intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, []bool{true, 
false, true, false, true, true, false, true})
 
@@ -1257,12 +1329,16 @@ func (ps *ParquetIOTestSuite) TestNestedNullableField() 
{
        tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil),
                []arrow.Column{*arrow.NewColumn(structField,
                        arrow.NewChunked(structField.Type, 
[]arrow.Array{stData}))}, -1)
+       defer stData.Release() // NewChunked
        defer tbl.Release()
 
-       ps.roundTripTable(tbl, false)
+       ps.roundTripTable(mem, tbl, false)
 }
 
 func (ps *ParquetIOTestSuite) TestCanonicalNestedRoundTrip() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
        docIdField := arrow.Field{Name: "DocID", Type: 
arrow.PrimitiveTypes.Int64}
        linksField := arrow.Field{Name: "Links", Type: arrow.StructOf(
                arrow.Field{Name: "Backward", Type: 
arrow.ListOf(arrow.PrimitiveTypes.Int64)},
@@ -1278,15 +1354,15 @@ func (ps *ParquetIOTestSuite) 
TestCanonicalNestedRoundTrip() {
        nameField := arrow.Field{Name: "Name", Type: arrow.ListOf(nameStruct)}
        sc := arrow.NewSchema([]arrow.Field{docIdField, linksField, nameField}, 
nil)
 
-       docIDArr, _, err := array.FromJSON(memory.DefaultAllocator, 
docIdField.Type, strings.NewReader("[10, 20]"))
+       docIDArr, _, err := array.FromJSON(mem, docIdField.Type, 
strings.NewReader("[10, 20]"))
        ps.Require().NoError(err)
        defer docIDArr.Release()
 
-       linksIDArr, _, err := array.FromJSON(memory.DefaultAllocator, 
linksField.Type, strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]}, 
{"Backward":[10, 30], "Forward": [80]}]`))
+       linksIDArr, _, err := array.FromJSON(mem, linksField.Type, 
strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]}, {"Backward":[10, 
30], "Forward": [80]}]`))
        ps.Require().NoError(err)
        defer linksIDArr.Release()
 
-       nameArr, _, err := array.FromJSON(memory.DefaultAllocator, 
nameField.Type, strings.NewReader(`
+       nameArr, _, err := array.FromJSON(mem, nameField.Type, 
strings.NewReader(`
                        [[{"Language": [{"Code": "en_us", "Country": "us"},
                                                        {"Code": "en_us", 
"Country": null}],
                           "Url": "http://A"},
@@ -1301,12 +1377,19 @@ func (ps *ParquetIOTestSuite) 
TestCanonicalNestedRoundTrip() {
                *arrow.NewColumn(linksField, arrow.NewChunked(linksField.Type, 
[]arrow.Array{linksIDArr})),
                *arrow.NewColumn(nameField, arrow.NewChunked(nameField.Type, 
[]arrow.Array{nameArr})),
        }, 2)
+       defer docIDArr.Release()   // NewChunked
+       defer linksIDArr.Release() // NewChunked
+       defer nameArr.Release()    // NewChunked
+       defer expected.Release()
 
-       ps.roundTripTable(expected, false)
+       ps.roundTripTable(mem, expected, false)
 }
 
 func (ps *ParquetIOTestSuite) TestFixedSizeList() {
-       bldr := array.NewFixedSizeListBuilder(memory.DefaultAllocator, 3, 
arrow.PrimitiveTypes.Int16)
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       bldr := array.NewFixedSizeListBuilder(mem, 3, 
arrow.PrimitiveTypes.Int16)
        defer bldr.Release()
 
        vb := bldr.ValueBuilder().(*array.Int16Builder)
@@ -1315,15 +1398,24 @@ func (ps *ParquetIOTestSuite) TestFixedSizeList() {
        vb.AppendValues([]int16{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil)
 
        data := bldr.NewArray()
+       defer data.Release() // NewArray
+
        field := arrow.Field{Name: "root", Type: data.DataType(), Nullable: 
true}
-       expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
-               []arrow.Column{*arrow.NewColumn(field, 
arrow.NewChunked(field.Type, []arrow.Array{data}))}, -1)
+       cnk := arrow.NewChunked(field.Type, []arrow.Array{data})
+       defer data.Release() // NewChunked
+
+       tbl := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil), 
[]arrow.Column{*arrow.NewColumn(field, cnk)}, -1)
+       defer cnk.Release() // NewColumn
+       defer tbl.Release()
 
-       ps.roundTripTable(expected, true)
+       ps.roundTripTable(mem, tbl, true)
 }
 
 func (ps *ParquetIOTestSuite) TestNull() {
-       bldr := array.NewNullBuilder(memory.DefaultAllocator)
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       bldr := array.NewNullBuilder(mem)
        defer bldr.Release()
 
        bldr.AppendNull()
@@ -1340,12 +1432,15 @@ func (ps *ParquetIOTestSuite) TestNull() {
                -1,
        )
 
-       ps.roundTripTable(expected, true)
+       ps.roundTripTable(mem, expected, true)
 }
 
 // ARROW-17169
 func (ps *ParquetIOTestSuite) TestNullableListOfStruct() {
-       bldr := array.NewListBuilder(memory.DefaultAllocator, arrow.StructOf(
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       bldr := array.NewListBuilder(mem, arrow.StructOf(
                arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
                arrow.Field{Name: "b", Type: arrow.BinaryTypes.String},
        ))
@@ -1374,13 +1469,17 @@ func (ps *ParquetIOTestSuite) 
TestNullableListOfStruct() {
        field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
        expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
                []arrow.Column{*arrow.NewColumn(field, 
arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
+       defer arr.Release() // NewChunked
        defer expected.Release()
 
-       ps.roundTripTable(expected, false)
+       ps.roundTripTable(mem, expected, false)
 }
 
 func (ps *ParquetIOTestSuite) TestStructWithListOfNestedStructs() {
-       bldr := array.NewStructBuilder(memory.DefaultAllocator, arrow.StructOf(
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       bldr := array.NewStructBuilder(mem, arrow.StructOf(
                arrow.Field{
                        Nullable: true,
                        Name:     "l",
@@ -1421,9 +1520,10 @@ func (ps *ParquetIOTestSuite) 
TestStructWithListOfNestedStructs() {
        field := arrow.Field{Name: "x", Type: arr.DataType(), Nullable: true}
        expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil),
                []arrow.Column{*arrow.NewColumn(field, 
arrow.NewChunked(field.Type, []arrow.Array{arr}))}, -1)
+       defer arr.Release() // NewChunked
        defer expected.Release()
 
-       ps.roundTripTable(expected, false)
+       ps.roundTripTable(mem, expected, false)
 }
 
 func TestParquetArrowIO(t *testing.T) {
@@ -1431,6 +1531,9 @@ func TestParquetArrowIO(t *testing.T) {
 }
 
 func TestBufferedRecWrite(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
        sc := arrow.NewSchema([]arrow.Field{
                {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: 
true},
                {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
@@ -1458,7 +1561,7 @@ func TestBufferedRecWrite(t *testing.T) {
 
        wr, err := pqarrow.NewFileWriter(sc, &buf,
                
parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy), 
parquet.WithDictionaryDefault(false), parquet.WithDataPageSize(100*1024)),
-               pqarrow.DefaultWriterProps())
+               pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        require.NoError(t, err)
 
        p1 := rec.NewSlice(0, SIZELEN/2)
@@ -1486,7 +1589,10 @@ func TestBufferedRecWrite(t *testing.T) {
 }
 
 func (ps *ParquetIOTestSuite) TestArrowMapTypeRoundTrip() {
-       bldr := array.NewMapBuilder(memory.DefaultAllocator, 
arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32, false)
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       bldr := array.NewMapBuilder(mem, arrow.BinaryTypes.String, 
arrow.PrimitiveTypes.Int32, false)
        defer bldr.Release()
 
        kb := bldr.KeyBuilder().(*array.StringBuilder)
@@ -1510,15 +1616,124 @@ func (ps *ParquetIOTestSuite) 
TestArrowMapTypeRoundTrip() {
        defer arr.Release()
 
        fld := arrow.Field{Name: "mapped", Type: arr.DataType(), Nullable: true}
-       tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil),
-               []arrow.Column{*arrow.NewColumn(fld, 
arrow.NewChunked(arr.DataType(), []arrow.Array{arr}))}, -1)
+       cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+       defer arr.Release() // NewChunked
+       tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), 
[]arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+       defer cnk.Release() // NewColumn
        defer tbl.Release()
 
-       ps.roundTripTable(tbl, true)
+       ps.roundTripTable(mem, tbl, true)
+}
+
+func (ps *ParquetIOTestSuite) TestArrowExtensionTypeRoundTrip() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       extBuilder := array.NewExtensionBuilder(mem, types.NewUUIDType())
+       defer extBuilder.Release()
+       builder := types.NewUUIDBuilder(extBuilder)
+       builder.Append(uuid.New())
+       arr := builder.NewArray()
+       defer arr.Release()
+
+       fld := arrow.Field{Name: "uuid", Type: arr.DataType(), Nullable: true}
+       cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+       defer arr.Release() // NewChunked
+       tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), 
[]arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+       defer cnk.Release() // NewColumn
+       defer tbl.Release()
+
+       ps.roundTripTable(mem, tbl, true)
+}
+
+func (ps *ParquetIOTestSuite) TestArrowUnknownExtensionTypeRoundTrip() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       var written, expected arrow.Table
+
+       {
+               // Prepare `written` table with the extension type registered.
+               extType := types.NewUUIDType()
+               bldr := array.NewExtensionBuilder(mem, extType)
+               defer bldr.Release()
+
+               bldr.Builder.(*array.FixedSizeBinaryBuilder).AppendValues(
+                       [][]byte{nil, []byte("abcdefghijklmno0"), 
[]byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
+                       []bool{false, true, true, true})
+
+               arr := bldr.NewArray()
+               defer arr.Release()
+
+               if arrow.GetExtensionType("uuid") != nil {
+                       ps.NoError(arrow.UnregisterExtensionType("uuid"))
+               }
+
+               fld := arrow.Field{Name: "uuid", Type: arr.DataType(), 
Nullable: true}
+               cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+               defer arr.Release() // NewChunked
+               written = array.NewTable(arrow.NewSchema([]arrow.Field{fld}, 
nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+               defer cnk.Release() // NewColumn
+               defer written.Release()
+       }
+
+       {
+               // Prepare `expected` table with the extension type 
unregistered in the underlying type.
+               bldr := array.NewFixedSizeBinaryBuilder(mem, 
&arrow.FixedSizeBinaryType{ByteWidth: 16})
+               defer bldr.Release()
+               bldr.AppendValues(
+                       [][]byte{nil, []byte("abcdefghijklmno0"), 
[]byte("abcdefghijklmno1"), []byte("abcdefghijklmno2")},
+                       []bool{false, true, true, true})
+
+               arr := bldr.NewArray()
+               defer arr.Release()
+
+               fld := arrow.Field{Name: "uuid", Type: arr.DataType(), 
Nullable: true}
+               cnk := arrow.NewChunked(arr.DataType(), []arrow.Array{arr})
+               defer arr.Release() // NewChunked
+               expected = array.NewTable(arrow.NewSchema([]arrow.Field{fld}, 
nil), []arrow.Column{*arrow.NewColumn(fld, cnk)}, -1)
+               defer cnk.Release() // NewColumn
+               defer expected.Release()
+       }
+
+       // sanity check before going deeper
+       ps.Equal(expected.NumCols(), written.NumCols())
+       ps.Equal(expected.NumRows(), written.NumRows())
+
+       // just like roundTripTable() but different written vs. expected tables
+       var buf bytes.Buffer
+       props := pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema(), 
pqarrow.WithAllocator(mem))
+
+       writeProps := parquet.NewWriterProperties(parquet.WithAllocator(mem))
+       ps.Require().NoError(pqarrow.WriteTable(written, &buf, 
written.NumRows(), writeProps, props))
+
+       reader := ps.createReader(mem, buf.Bytes())
+       defer reader.ParquetReader().Close()
+
+       tbl := ps.readTable(reader)
+       defer tbl.Release()
+
+       ps.Equal(expected.NumCols(), tbl.NumCols())
+       ps.Equal(expected.NumRows(), tbl.NumRows())
+
+       exChunk := expected.Column(0).Data()
+       tblChunk := tbl.Column(0).Data()
+
+       ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks()))
+       exc := exChunk.Chunk(0)
+       tbc := tblChunk.Chunk(0)
+       ps.Truef(array.Equal(exc, tbc), "expected: %T %s\ngot: %T %s", exc, 
exc, tbc, tbc)
+
+       expectedMd := arrow.MetadataFrom(map[string]string{
+               ipc.ExtensionTypeKeyName:     "uuid",
+               ipc.ExtensionMetadataKeyName: "uuid-serialized",
+               "PARQUET:field_id":           "-1",
+       })
+       ps.Truef(expectedMd.Equal(tbl.Column(0).Field().Metadata), "expected: 
%v\ngot: %v", expectedMd, tbl.Column(0).Field().Metadata)
 }
 
 func TestWriteTableMemoryAllocation(t *testing.T) {
-       allocator := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        sc := arrow.NewSchema([]arrow.Field{
                {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: 
true},
                {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
@@ -1526,9 +1741,10 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
                        arrow.Field{Name: "i64", Type: 
arrow.PrimitiveTypes.Int64, Nullable: true},
                        arrow.Field{Name: "f64", Type: 
arrow.PrimitiveTypes.Float64, Nullable: true})},
                {Name: "arr_i64", Type: 
arrow.ListOf(arrow.PrimitiveTypes.Int64)},
+               {Name: "uuid", Type: types.NewUUIDType(), Nullable: true},
        }, nil)
 
-       bld := array.NewRecordBuilder(allocator, sc)
+       bld := array.NewRecordBuilder(mem, sc)
        bld.Field(0).(*array.Float32Builder).Append(1.0)
        bld.Field(1).(*array.Int32Builder).Append(1)
        sbld := bld.Field(2).(*array.StructBuilder)
@@ -1538,6 +1754,7 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
        abld := bld.Field(3).(*array.ListBuilder)
        abld.Append(true)
        abld.ValueBuilder().(*array.Int64Builder).Append(2)
+       
bld.Field(4).(*types.UUIDBuilder).Append(uuid.MustParse("00000000-0000-0000-0000-000000000001"))
 
        rec := bld.NewRecord()
        bld.Release()
@@ -1545,12 +1762,12 @@ func TestWriteTableMemoryAllocation(t *testing.T) {
        var buf bytes.Buffer
        wr, err := pqarrow.NewFileWriter(sc, &buf,
                
parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy)),
-               
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(allocator)))
+               pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
        require.NoError(t, err)
 
        require.NoError(t, wr.Write(rec))
        rec.Release()
        wr.Close()
 
-       require.Zero(t, allocator.CurrentAlloc())
+       require.Zero(t, mem.CurrentAlloc())
 }
diff --git a/go/parquet/pqarrow/encode_dictionary_test.go 
b/go/parquet/pqarrow/encode_dictionary_test.go
index d56f1bdde9..10ad1cd36b 100644
--- a/go/parquet/pqarrow/encode_dictionary_test.go
+++ b/go/parquet/pqarrow/encode_dictionary_test.go
@@ -61,8 +61,8 @@ func (ps *ParquetIOTestSuite) 
TestSingleColumnOptionalDictionaryWrite() {
                        defer arr.Release()
 
                        sc := ps.makeSimpleSchema(arr.DataType(), 
parquet.Repetitions.Optional)
-                       data := ps.writeColumn(sc, arr)
-                       ps.readAndCheckSingleColumnFile(data, values)
+                       data := ps.writeColumn(mem, sc, arr)
+                       ps.readAndCheckSingleColumnFile(mem, data, values)
                })
        }
 }
diff --git a/go/parquet/pqarrow/file_reader.go 
b/go/parquet/pqarrow/file_reader.go
index f92ac5a3e7..d73f9301e5 100755
--- a/go/parquet/pqarrow/file_reader.go
+++ b/go/parquet/pqarrow/file_reader.go
@@ -356,11 +356,9 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, 
indices, rowGroups []in
 
        // pass pairs of reader and column index to the channel for the
        // goroutines to read the data
-       for idx, r := range readers {
-               defer func(r *ColumnReader) {
-                       r.Release()
-               }(r)
-               ch <- readerInfo{r, idx}
+       for idx := range readers {
+               defer readers[idx].Release()
+               ch <- readerInfo{readers[idx], idx}
        }
        close(ch)
 
diff --git a/go/parquet/pqarrow/file_writer.go 
b/go/parquet/pqarrow/file_writer.go
index e4e212eca8..413dcf9ec3 100644
--- a/go/parquet/pqarrow/file_writer.go
+++ b/go/parquet/pqarrow/file_writer.go
@@ -32,7 +32,7 @@ import (
 )
 
 // WriteTable is a convenience function to create and write a full array.Table 
to a parquet file. The schema
-// and columns will be determined by the schema of the table, writing the file 
out to the the provided writer.
+// and columns will be determined by the schema of the table, writing the file 
out to the provided writer.
 // The chunksize will be utilized in order to determine the size of the row 
groups.
 func WriteTable(tbl arrow.Table, w io.Writer, chunkSize int64, props 
*parquet.WriterProperties, arrprops ArrowWriterProperties) error {
        writer, err := NewFileWriter(tbl.Schema(), w, props, arrprops)
diff --git a/go/parquet/pqarrow/path_builder.go 
b/go/parquet/pqarrow/path_builder.go
index c3f9dc5698..7c55dd8417 100644
--- a/go/parquet/pqarrow/path_builder.go
+++ b/go/parquet/pqarrow/path_builder.go
@@ -400,7 +400,7 @@ func (p *pathBuilder) Visit(arr arrow.Array) error {
                p.maybeAddNullable(arr)
                larr := arr.(*array.FixedSizeList)
                listSize := larr.DataType().(*arrow.FixedSizeListType).Len()
-               // technically we could encoded fixed sized lists with two 
level encodings
+               // technically we could encode fixed sized lists with two level 
encodings
                // but we always use 3 level encoding, so we increment def 
levels as well
                p.info.maxDefLevel++
                p.info.maxRepLevel++
@@ -440,7 +440,7 @@ func (p *pathBuilder) Visit(arr arrow.Array) error {
                }
                return nil
        case arrow.EXTENSION:
-               return xerrors.New("extension types not implemented yet")
+               return p.Visit(arr.(array.ExtensionArray).Storage())
        case arrow.SPARSE_UNION, arrow.DENSE_UNION:
                return xerrors.New("union types aren't supported in parquet")
        default:
diff --git a/go/parquet/pqarrow/path_builder_test.go 
b/go/parquet/pqarrow/path_builder_test.go
index 3dad25dd70..c3ea34ee38 100644
--- a/go/parquet/pqarrow/path_builder_test.go
+++ b/go/parquet/pqarrow/path_builder_test.go
@@ -23,6 +23,8 @@ import (
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
+       "github.com/google/uuid"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
@@ -358,6 +360,52 @@ func TestNestedListsSomeNullsSomeEmpty(t *testing.T) {
        assert.Equal(t, []int16{0, 0, 2, 2, 1, 1, 0, 2}, result.repLevels)
 }
 
+func TestNestedExtensionListsWithSomeNulls(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(t, 0)
+
+       listType := arrow.ListOf(types.NewUUIDType())
+       bldr := array.NewListBuilder(mem, listType)
+       defer bldr.Release()
+
+       nestedBldr := bldr.ValueBuilder().(*array.ListBuilder)
+       vb := nestedBldr.ValueBuilder().(*types.UUIDBuilder)
+
+       uuid1 := uuid.New()
+       uuid3 := uuid.New()
+       uuid4 := uuid.New()
+       uuid5 := uuid.New()
+
+       // produce: [null, [[uuid1, null, uuid3], null, null], [[uuid4, uuid5]]]
+
+       bldr.AppendNull()
+       bldr.Append(true)
+       nestedBldr.Append(true)
+       vb.Append(uuid1)
+       vb.AppendNull()
+       vb.Append(uuid3)
+       nestedBldr.AppendNull()
+       nestedBldr.AppendNull()
+       bldr.Append(true)
+       nestedBldr.Append(true)
+       vb.AppendValues([]uuid.UUID{uuid4, uuid5}, nil)
+
+       arr := bldr.NewListArray()
+       defer arr.Release()
+
+       mp, err := newMultipathLevelBuilder(arr, true)
+       require.NoError(t, err)
+       defer mp.Release()
+
+       ctx := arrowCtxFromContext(NewArrowWriteContext(context.Background(), 
nil))
+       result, err := mp.write(0, ctx)
+       require.NoError(t, err)
+
+       assert.Equal(t, []int16{0, 5, 4, 5, 2, 2, 5, 5}, result.defLevels)
+       assert.Equal(t, []int16{0, 0, 2, 2, 1, 1, 0, 2}, result.repLevels)
+       assert.Equal(t, result.leafArr.NullN(), 1)
+}
+
 // triplenested translates to parquet:
 //
 // optional group bag {
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index b7813d92e6..3b926242d6 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -24,6 +24,7 @@ import (
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/flight"
+       "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
        "github.com/apache/arrow/go/v12/parquet"
        "github.com/apache/arrow/go/v12/parquet/file"
@@ -354,7 +355,15 @@ func fieldToNode(name string, field arrow.Field, props 
*parquet.WriterProperties
                return fieldToNode(name, arrow.Field{Name: name, Type: 
dictType.ValueType, Nullable: field.Nullable, Metadata: field.Metadata},
                        props, arrprops)
        case arrow.EXTENSION:
-               return nil, xerrors.New("not implemented yet")
+               return fieldToNode(name, arrow.Field{
+                       Name:     name,
+                       Type:     
field.Type.(arrow.ExtensionType).StorageType(),
+                       Nullable: field.Nullable,
+                       Metadata: arrow.MetadataFrom(map[string]string{
+                               ipc.ExtensionTypeKeyName:     
field.Type.(arrow.ExtensionType).ExtensionName(),
+                               ipc.ExtensionMetadataKeyName: 
field.Type.(arrow.ExtensionType).Serialize(),
+                       }),
+               }, props, arrprops)
        case arrow.MAP:
                mapType := field.Type.(*arrow.MapType)
                keyNode, err := fieldToNode("key", mapType.KeyField(), props, 
arrprops)
@@ -948,7 +957,24 @@ func getNestedFactory(origin, inferred arrow.DataType) 
func(fieldList []arrow.Fi
 func applyOriginalStorageMetadata(origin arrow.Field, inferred *SchemaField) 
(modified bool, err error) {
        nchildren := len(inferred.Children)
        switch origin.Type.ID() {
-       case arrow.EXTENSION, arrow.SPARSE_UNION, arrow.DENSE_UNION:
+       case arrow.EXTENSION:
+               extType := origin.Type.(arrow.ExtensionType)
+               modified, err = applyOriginalStorageMetadata(arrow.Field{
+                       Type:     extType.StorageType(),
+                       Metadata: origin.Metadata,
+               }, inferred)
+               if err != nil {
+                       return
+               }
+
+               if !arrow.TypeEqual(extType.StorageType(), inferred.Field.Type) 
{
+                       return modified, fmt.Errorf("%w: mismatch storage type 
'%s' for extension type '%s'",
+                               arrow.ErrInvalid, inferred.Field.Type, extType)
+               }
+
+               inferred.Field.Type = extType
+               modified = true
+       case arrow.SPARSE_UNION, arrow.DENSE_UNION:
                err = xerrors.New("unimplemented type")
        case arrow.STRUCT:
                typ := origin.Type.(*arrow.StructType)
@@ -1053,10 +1079,6 @@ func applyOriginalStorageMetadata(origin arrow.Field, 
inferred *SchemaField) (mo
 }
 
 func applyOriginalMetadata(origin arrow.Field, inferred *SchemaField) (bool, 
error) {
-       if origin.Type.ID() == arrow.EXTENSION {
-               return false, xerrors.New("extension types not implemented yet")
-       }
-
        return applyOriginalStorageMetadata(origin, inferred)
 }
 
diff --git a/go/parquet/pqarrow/schema_test.go 
b/go/parquet/pqarrow/schema_test.go
index 07133fb400..219528a2e3 100644
--- a/go/parquet/pqarrow/schema_test.go
+++ b/go/parquet/pqarrow/schema_test.go
@@ -22,7 +22,9 @@ import (
 
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/flight"
+       "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/apache/arrow/go/v12/internal/types"
        "github.com/apache/arrow/go/v12/parquet"
        "github.com/apache/arrow/go/v12/parquet/metadata"
        "github.com/apache/arrow/go/v12/parquet/pqarrow"
@@ -32,13 +34,20 @@ import (
 )
 
 func TestGetOriginSchemaBase64(t *testing.T) {
+       uuidType := types.NewUUIDType()
        md := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})
+       extMd := arrow.NewMetadata([]string{ipc.ExtensionMetadataKeyName, 
ipc.ExtensionTypeKeyName, "PARQUET:field_id"}, []string{uuidType.Serialize(), 
uuidType.ExtensionName(), "-1"})
        origArrSc := arrow.NewSchema([]arrow.Field{
                {Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
                {Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+               {Name: "uuid", Type: uuidType, Metadata: extMd},
        }, nil)
 
        arrSerializedSc := flight.SerializeSchema(origArrSc, 
memory.DefaultAllocator)
+       if err := arrow.RegisterExtensionType(uuidType); err != nil {
+               t.Fatal(err)
+       }
+       defer arrow.UnregisterExtensionType(uuidType.ExtensionName())
        pqschema, err := pqarrow.ToParquet(origArrSc, nil, 
pqarrow.DefaultWriterProps())
        require.NoError(t, err)
 
@@ -61,6 +70,40 @@ func TestGetOriginSchemaBase64(t *testing.T) {
        }
 }
 
+func TestGetOriginSchemaUnregisteredExtension(t *testing.T) {
+       uuidType := types.NewUUIDType()
+       if err := arrow.RegisterExtensionType(uuidType); err != nil {
+               t.Fatal(err)
+       }
+
+       md := arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"-1"})
+       origArrSc := arrow.NewSchema([]arrow.Field{
+               {Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
+               {Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+               {Name: "uuid", Type: uuidType, Metadata: md},
+       }, nil)
+       pqschema, err := pqarrow.ToParquet(origArrSc, nil, 
pqarrow.DefaultWriterProps())
+       require.NoError(t, err)
+
+       arrSerializedSc := flight.SerializeSchema(origArrSc, 
memory.DefaultAllocator)
+       kv := metadata.NewKeyValueMetadata()
+       kv.Append("ARROW:schema", 
base64.StdEncoding.EncodeToString(arrSerializedSc))
+
+       arrow.UnregisterExtensionType(uuidType.ExtensionName())
+       arrsc, err := pqarrow.FromParquet(pqschema, nil, kv)
+       require.NoError(t, err)
+
+       extMd := arrow.NewMetadata([]string{ipc.ExtensionMetadataKeyName, 
ipc.ExtensionTypeKeyName, "PARQUET:field_id"},
+               []string{uuidType.Serialize(), uuidType.ExtensionName(), "-1"})
+       expArrSc := arrow.NewSchema([]arrow.Field{
+               {Name: "f1", Type: arrow.BinaryTypes.String, Metadata: md},
+               {Name: "f2", Type: arrow.PrimitiveTypes.Int64, Metadata: md},
+               {Name: "uuid", Type: uuidType.StorageType(), Metadata: extMd},
+       }, nil)
+
+       assert.Truef(t, expArrSc.Equal(arrsc), "expected: %s\ngot: %s", 
expArrSc, arrsc)
+}
+
 func TestToParquetWriterConfig(t *testing.T) {
        origSc := arrow.NewSchema([]arrow.Field{
                {Name: "f1", Type: arrow.BinaryTypes.String},
diff --git a/testing b/testing
index 47f7b56b25..0c73b9add4 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12
+Subproject commit 0c73b9add4e8e30eb9c2690d077fabb269750b27


Reply via email to