This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new fd6a3ab9ef ARROW-16749: [Go] Fix pqarrow writer for null array
fd6a3ab9ef is described below
commit fd6a3ab9ef1830964ab4b0e7f8b3c0f4779bf4e4
Author: alexandreyc <[email protected]>
AuthorDate: Fri Jun 10 13:58:43 2022 -0400
ARROW-16749: [Go] Fix pqarrow writer for null array
Hello world,
When converting from Arrow to Parquet it looks like there is a bug with
arrays of type `arrow.NULL`. Here is a snippet of code to reproduce the bug:
```go
package main
import (
"fmt"
"log"
"os"
"github.com/apache/arrow/go/v9/arrow"
"github.com/apache/arrow/go/v9/arrow/array"
"github.com/apache/arrow/go/v9/arrow/memory"
"github.com/apache/arrow/go/v9/parquet/pqarrow"
)
const n = 10
func run() error {
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "f1", Type: arrow.Null, Nullable: true},
},
nil,
)
rb := array.NewRecordBuilder(memory.DefaultAllocator, schema)
defer rb.Release()
for i := 0; i < n; i++ {
rb.Field(0).(*array.NullBuilder).AppendNull()
}
rec := rb.NewRecord()
defer rec.Release()
for i, col := range rec.Columns() {
fmt.Printf("column[%d] %q: %v\n", i, rec.ColumnName(i), col)
}
f, err := os.Create("output.parquet")
if err != nil {
return err
}
defer f.Close()
w, err := pqarrow.NewFileWriter(rec.Schema(), f, nil,
pqarrow.DefaultWriterProps())
if err != nil {
return err
}
defer w.Close()
err = w.Write(rec)
if err != nil {
return err
}
return nil
}
func main() {
if err := run(); err != nil {
log.Fatal(err)
}
}
```
I'm not 100% sure if my fix is the best, so feel free to correct me.
Thanks
A
Closes #13310 from alexandreyc/go-fix-allnull-parquet
Authored-by: alexandreyc <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/parquet/file/column_writer_types.gen.go | 58 ++++++++++------------
go/parquet/file/column_writer_types.gen.go.tmpl | 6 +--
.../gen-go/parquet/GoUnusedProtection__.go | 2 +-
.../internal/gen-go/parquet/parquet-consts.go | 2 +-
go/parquet/internal/gen-go/parquet/parquet.go | 2 +-
go/parquet/metadata/statistics_types.gen.go | 20 ++++----
go/parquet/pqarrow/encode_arrow.go | 3 ++
go/parquet/pqarrow/encode_arrow_test.go | 21 ++++++++
8 files changed, 65 insertions(+), 49 deletions(-)
diff --git a/go/parquet/file/column_writer_types.gen.go
b/go/parquet/file/column_writer_types.gen.go
index abbcc732bf..fca500e626 100644
--- a/go/parquet/file/column_writer_types.gen.go
+++ b/go/parquet/file/column_writer_types.gen.go
@@ -20,7 +20,7 @@ package file
import (
"fmt"
-
+
"github.com/apache/arrow/go/v9/parquet"
"github.com/apache/arrow/go/v9/parquet/internal/encoding"
format "github.com/apache/arrow/go/v9/parquet/internal/gen-go/parquet"
@@ -74,7 +74,6 @@ func (w *Int32ColumnChunkWriter) WriteBatch(values []int32,
defLevels, repLevels
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -133,13 +132,13 @@ func (w *Int32ColumnChunkWriter) WriteBatchSpaced(values
[]int32, defLevels, rep
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -233,7 +232,6 @@ func (w *Int64ColumnChunkWriter) WriteBatch(values []int64,
defLevels, repLevels
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -292,13 +290,13 @@ func (w *Int64ColumnChunkWriter) WriteBatchSpaced(values
[]int64, defLevels, rep
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -392,7 +390,6 @@ func (w *Int96ColumnChunkWriter) WriteBatch(values
[]parquet.Int96, defLevels, r
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -451,13 +448,13 @@ func (w *Int96ColumnChunkWriter) WriteBatchSpaced(values
[]parquet.Int96, defLev
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -551,7 +548,6 @@ func (w *Float32ColumnChunkWriter) WriteBatch(values
[]float32, defLevels, repLe
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -610,13 +606,13 @@ func (w *Float32ColumnChunkWriter)
WriteBatchSpaced(values []float32, defLevels,
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -710,7 +706,6 @@ func (w *Float64ColumnChunkWriter) WriteBatch(values
[]float64, defLevels, repLe
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -769,13 +764,13 @@ func (w *Float64ColumnChunkWriter)
WriteBatchSpaced(values []float64, defLevels,
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -872,7 +867,6 @@ func (w *BooleanColumnChunkWriter) WriteBatch(values
[]bool, defLevels, repLevel
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -931,13 +925,13 @@ func (w *BooleanColumnChunkWriter)
WriteBatchSpaced(values []bool, defLevels, re
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -1031,7 +1025,6 @@ func (w *ByteArrayColumnChunkWriter) WriteBatch(values
[]parquet.ByteArray, defL
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -1090,13 +1083,13 @@ func (w *ByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.ByteArray
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
@@ -1190,7 +1183,6 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteBatch(values []parquet.FixedLe
}
}
}()
-
// We check for DataPage limits only after we have inserted the values.
If a user
// writes a large number of values, the DataPage size can be much above
the limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
@@ -1249,13 +1241,13 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteBatchSpaced(values []parquet.F
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset,
batch), levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset :
valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum,
w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()],
info.batchNum, validBits, validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
diff --git a/go/parquet/file/column_writer_types.gen.go.tmpl
b/go/parquet/file/column_writer_types.gen.go.tmpl
index 3f4a9b5a6b..1c4d326519 100644
--- a/go/parquet/file/column_writer_types.gen.go.tmpl
+++ b/go/parquet/file/column_writer_types.gen.go.tmpl
@@ -137,13 +137,13 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteBatchSpaced(values []{{.name}}, defLev
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch),
levelSliceOrNil(repLevels, offset, batch))
if values != nil {
- vals = values[valueOffset:]
+ vals = values[valueOffset:valueOffset+info.numSpaced()]
}
if w.bitsBuffer != nil {
- w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum,
w.bitsBuffer.Bytes(), 0)
+ w.writeValuesSpaced(vals, info.batchNum, w.bitsBuffer.Bytes(), 0)
} else {
- w.writeValuesSpaced(vals[:info.numSpaced()], info.batchNum, validBits,
validBitsOffset+valueOffset)
+ w.writeValuesSpaced(vals, info.batchNum, validBits,
validBitsOffset+valueOffset)
}
w.commitWriteAndCheckPageLimit(batch, info.numSpaced())
valueOffset += info.numSpaced()
diff --git a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
index e3025d7a8c..1de0c8dee4 100644
--- a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
+++ b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
@@ -1,4 +1,4 @@
-// Code generated by Thrift Compiler (0.15.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
package parquet
diff --git a/go/parquet/internal/gen-go/parquet/parquet-consts.go
b/go/parquet/internal/gen-go/parquet/parquet-consts.go
index 347057e98f..d4a63b22b8 100644
--- a/go/parquet/internal/gen-go/parquet/parquet-consts.go
+++ b/go/parquet/internal/gen-go/parquet/parquet-consts.go
@@ -1,4 +1,4 @@
-// Code generated by Thrift Compiler (0.15.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
package parquet
diff --git a/go/parquet/internal/gen-go/parquet/parquet.go
b/go/parquet/internal/gen-go/parquet/parquet.go
index bb1602851d..d4508f8e45 100644
--- a/go/parquet/internal/gen-go/parquet/parquet.go
+++ b/go/parquet/internal/gen-go/parquet/parquet.go
@@ -1,4 +1,4 @@
-// Code generated by Thrift Compiler (0.15.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
package parquet
diff --git a/go/parquet/metadata/statistics_types.gen.go
b/go/parquet/metadata/statistics_types.gen.go
index 8d0cde7b4c..5fc2033586 100644
--- a/go/parquet/metadata/statistics_types.gen.go
+++ b/go/parquet/metadata/statistics_types.gen.go
@@ -24,10 +24,10 @@ import (
"github.com/apache/arrow/go/v9/arrow"
"github.com/apache/arrow/go/v9/arrow/memory"
- "github.com/apache/arrow/go/v9/internal/utils"
"github.com/apache/arrow/go/v9/internal/bitutils"
+ shared_utils "github.com/apache/arrow/go/v9/internal/utils"
"github.com/apache/arrow/go/v9/parquet"
- "github.com/apache/arrow/go/v9/parquet/internal/encoding"
+ "github.com/apache/arrow/go/v9/parquet/internal/encoding"
"github.com/apache/arrow/go/v9/parquet/schema"
"golang.org/x/xerrors"
)
@@ -152,9 +152,9 @@ func (s *Int32Statistics) Equals(other TypedStatistics)
bool {
func (s *Int32Statistics) getMinMax(values []int32) (min, max int32) {
if s.order == schema.SortSIGNED {
- min, max = utils.GetMinMaxInt32(values)
+ min, max = shared_utils.GetMinMaxInt32(values)
} else {
- umin, umax :=
utils.GetMinMaxUint32(arrow.Uint32Traits.CastFromBytes(arrow.Int32Traits.CastToBytes(values)))
+ umin, umax :=
shared_utils.GetMinMaxUint32(arrow.Uint32Traits.CastFromBytes(arrow.Int32Traits.CastToBytes(values)))
min, max = int32(umin), int32(umax)
}
return
@@ -165,10 +165,10 @@ func (s *Int32Statistics) getMinMaxSpaced(values []int32,
validBits []byte, vali
max = s.defaultMax()
var fn func([]int32) (int32, int32)
if s.order == schema.SortSIGNED {
- fn = utils.GetMinMaxInt32
+ fn = shared_utils.GetMinMaxInt32
} else {
fn = func(v []int32) (int32, int32) {
- umin, umax :=
utils.GetMinMaxUint32(arrow.Uint32Traits.CastFromBytes(arrow.Int32Traits.CastToBytes(values)))
+ umin, umax :=
shared_utils.GetMinMaxUint32(arrow.Uint32Traits.CastFromBytes(arrow.Int32Traits.CastToBytes(values)))
return int32(umin), int32(umax)
}
}
@@ -432,9 +432,9 @@ func (s *Int64Statistics) Equals(other TypedStatistics)
bool {
func (s *Int64Statistics) getMinMax(values []int64) (min, max int64) {
if s.order == schema.SortSIGNED {
- min, max = utils.GetMinMaxInt64(values)
+ min, max = shared_utils.GetMinMaxInt64(values)
} else {
- umin, umax :=
utils.GetMinMaxUint64(arrow.Uint64Traits.CastFromBytes(arrow.Int64Traits.CastToBytes(values)))
+ umin, umax :=
shared_utils.GetMinMaxUint64(arrow.Uint64Traits.CastFromBytes(arrow.Int64Traits.CastToBytes(values)))
min, max = int64(umin), int64(umax)
}
return
@@ -445,10 +445,10 @@ func (s *Int64Statistics) getMinMaxSpaced(values []int64,
validBits []byte, vali
max = s.defaultMax()
var fn func([]int64) (int64, int64)
if s.order == schema.SortSIGNED {
- fn = utils.GetMinMaxInt64
+ fn = shared_utils.GetMinMaxInt64
} else {
fn = func(v []int64) (int64, int64) {
- umin, umax :=
utils.GetMinMaxUint64(arrow.Uint64Traits.CastFromBytes(arrow.Int64Traits.CastToBytes(values)))
+ umin, umax :=
shared_utils.GetMinMaxUint64(arrow.Uint64Traits.CastFromBytes(arrow.Int64Traits.CastToBytes(values)))
return int64(umin), int64(umax)
}
}
diff --git a/go/parquet/pqarrow/encode_arrow.go
b/go/parquet/pqarrow/encode_arrow.go
index fe3c87a8a0..e662d2bd5e 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -280,6 +280,9 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
data[idx] = int32(val) * 1000
}
}
+ case arrow.NULL:
+ wr.WriteBatchSpaced(nil, defLevels, repLevels,
leafArr.NullBitmapBytes(), 0)
+ return
default:
// simple integral cases, parquet physical storage is
int32 or int64
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index c3c9949487..7f799e2338 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -1263,6 +1263,27 @@ func (ps *ParquetIOTestSuite) TestFixedSizeList() {
ps.roundTripTable(expected, true)
}
+func (ps *ParquetIOTestSuite) TestNull() {
+ bldr := array.NewNullBuilder(memory.DefaultAllocator)
+ defer bldr.Release()
+
+ bldr.AppendNull()
+ bldr.AppendNull()
+ bldr.AppendNull()
+
+ data := bldr.NewArray()
+ defer data.Release()
+
+ field := arrow.Field{Name: "x", Type: data.DataType(), Nullable: true}
+ expected := array.NewTable(
+ arrow.NewSchema([]arrow.Field{field}, nil),
+ []arrow.Column{*arrow.NewColumn(field,
arrow.NewChunked(field.Type, []arrow.Array{data}))},
+ -1,
+ )
+
+ ps.roundTripTable(expected, true)
+}
+
func TestParquetArrowIO(t *testing.T) {
suite.Run(t, new(ParquetIOTestSuite))
}