This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new bff5fb95ac GH-37582: [Go][Parquet] Implement Float16 logical type
(#37599)
bff5fb95ac is described below
commit bff5fb95ac987354ecd7d69466e11b9f33a67d08
Author: Ben Harkins <[email protected]>
AuthorDate: Mon Nov 13 16:02:49 2023 -0500
GH-37582: [Go][Parquet] Implement Float16 logical type (#37599)
### Rationale for this change
There is an active proposal for a Float16 logical type in Parquet
(https://github.com/apache/parquet-format/pull/184) with C++/Python
implementations in progress (https://github.com/apache/arrow/pull/36073), so we
should add one for Go as well.
### What changes are included in this PR?
- [x] Adds `LogicalType` definitions and methods for `Float16`
- [x] Adds support for `Float16` column statistics and comparators
- [x] Adds support for interchange between Parquet and Arrow's
half-precision float
### Are these changes tested?
Yes
### Are there any user-facing changes?
Yes
* Closes: #37582
Authored-by: benibus <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/float16/float16.go | 46 ++-
go/arrow/float16/float16_test.go | 43 +++
go/parquet/file/column_writer_types.gen.go | 13 +-
go/parquet/file/column_writer_types.gen.go.tmpl | 28 +-
.../gen-go/parquet/GoUnusedProtection__.go | 2 +-
.../internal/gen-go/parquet/parquet-consts.go | 9 +-
go/parquet/internal/gen-go/parquet/parquet.go | 382 ++++++++++++++++++---
go/parquet/internal/testutils/random.go | 23 ++
go/parquet/internal/testutils/random_arrow.go | 23 ++
go/parquet/metadata/statistics.go | 45 ++-
go/parquet/metadata/statistics_test.go | 62 +++-
go/parquet/metadata/statistics_types.gen.go | 315 +++++++++++++++++
go/parquet/metadata/statistics_types.gen.go.tmpl | 47 ++-
go/parquet/metadata/statistics_types.tmpldata | 60 ++++
go/parquet/pqarrow/column_readers.go | 16 +
go/parquet/pqarrow/encode_arrow.go | 25 ++
go/parquet/pqarrow/encode_arrow_test.go | 9 +
go/parquet/pqarrow/schema.go | 6 +
go/parquet/pqarrow/schema_test.go | 20 +-
go/parquet/schema/logical_types.go | 46 +++
go/parquet/schema/logical_types_test.go | 12 +
go/parquet/schema/reflection.go | 6 +
go/parquet/schema/reflection_test.go | 7 +
go/parquet/schema/schema_element_test.go | 9 +-
go/parquet/schema/schema_test.go | 4 +
25 files changed, 1183 insertions(+), 75 deletions(-)
diff --git a/go/arrow/float16/float16.go b/go/arrow/float16/float16.go
index 4e03d13df0..e0192495eb 100644
--- a/go/arrow/float16/float16.go
+++ b/go/arrow/float16/float16.go
@@ -17,6 +17,7 @@
package float16
import (
+ "encoding/binary"
"math"
"strconv"
)
@@ -29,6 +30,11 @@ type Num struct {
bits uint16
}
+var (
+ MaxNum = Num{bits: 0b0111101111111111}
+ MinNum = MaxNum.Negate()
+)
+
// New creates a new half-precision floating point value from the provided
// float32 value.
func New(f float32) Num {
@@ -86,6 +92,11 @@ func (n Num) Div(rhs Num) Num {
return New(n.Float32() / rhs.Float32())
}
+// Equal returns true if the value represented by n is == other
+func (n Num) Equal(other Num) bool {
+ return n.Float32() == other.Float32()
+}
+
// Greater returns true if the value represented by n is > other
func (n Num) Greater(other Num) bool {
return n.Float32() > other.Float32()
@@ -152,14 +163,39 @@ func (n Num) Abs() Num {
}
func (n Num) Sign() int {
- f := n.Float32()
- if f > 0 {
- return 1
- } else if f == 0 {
+ if n.IsZero() {
return 0
+ } else if n.Signbit() {
+ return -1
}
- return -1
+ return 1
}
+func (n Num) Signbit() bool { return (n.bits & 0x8000) != 0 }
+
+func (n Num) IsNaN() bool { return (n.bits & 0x7fff) > 0x7c00 }
+
+func (n Num) IsZero() bool { return (n.bits & 0x7fff) == 0 }
+
func (f Num) Uint16() uint16 { return f.bits }
func (f Num) String() string { return
strconv.FormatFloat(float64(f.Float32()), 'g', -1, 32) }
+
+func Inf() Num { return Num{bits: 0x7c00} }
+
+func NaN() Num { return Num{bits: 0x7fff} }
+
+func FromBits(src uint16) Num { return Num{bits: src} }
+
+func FromLEBytes(src []byte) Num {
+ return Num{bits: binary.LittleEndian.Uint16(src)}
+}
+
+func (f Num) PutLEBytes(dst []byte) {
+ binary.LittleEndian.PutUint16(dst, f.bits)
+}
+
+func (f Num) ToLEBytes() []byte {
+ dst := make([]byte, 2)
+ f.PutLEBytes(dst)
+ return dst
+}
diff --git a/go/arrow/float16/float16_test.go b/go/arrow/float16/float16_test.go
index 55c3ea8b30..cfde440c5f 100644
--- a/go/arrow/float16/float16_test.go
+++ b/go/arrow/float16/float16_test.go
@@ -238,6 +238,7 @@ func TestSign(t *testing.T) {
}{
{Num{bits: 0x4580}, 1}, // 5.5
{Num{bits: 0x0000}, 0}, // 0
+ {Num{bits: 0x8000}, 0}, // -0
{Num{bits: 0xC580}, -1}, // -5.5
} {
t.Run("sign", func(t *testing.T) {
@@ -248,3 +249,45 @@ func TestSign(t *testing.T) {
})
}
}
+
+func TestSignbit(t *testing.T) {
+ for _, tc := range []struct {
+ n Num
+ want bool
+ }{
+ {Num{bits: 0x4580}, false}, // 5.5
+ {Num{bits: 0x0000}, false}, // 0
+ {Num{bits: 0x8000}, true}, // -0
+ {Num{bits: 0xC580}, true}, // -5.5
+ } {
+ t.Run("signbit", func(t *testing.T) {
+ n := tc.n.Signbit()
+ if got, want := n, tc.want; got != want {
+ t.Fatalf("invalid value. got=%v, want=%v", got,
want)
+ }
+ })
+ }
+}
+
+func TestIsNaN(t *testing.T) {
+ for _, tc := range []struct {
+ n Num
+ want bool
+ }{
+ {NaN(), true},
+ {NaN().Negate(), true},
+ {Inf(), false},
+ {Inf().Negate(), false},
+ {Num{bits: 0x7c01}, true}, // nan
+ {Num{bits: 0xfc01}, true}, // -nan
+ {Num{bits: 0x7e00}, true}, // nan
+ {Num{bits: 0xfe00}, true}, // -nan
+ } {
+ t.Run("isnan", func(t *testing.T) {
+ n := tc.n.IsNaN()
+ if got, want := n, tc.want; got != want {
+ t.Fatalf("invalid value. got=%v, want=%v", got,
want)
+ }
+ })
+ }
+}
diff --git a/go/parquet/file/column_writer_types.gen.go
b/go/parquet/file/column_writer_types.gen.go
index 8b3be25f3e..b4d7954639 100644
--- a/go/parquet/file/column_writer_types.gen.go
+++ b/go/parquet/file/column_writer_types.gen.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v15/parquet/internal/encoding"
format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
"github.com/apache/arrow/go/v15/parquet/metadata"
+ "github.com/apache/arrow/go/v15/parquet/schema"
"golang.org/x/xerrors"
)
@@ -1629,7 +1630,11 @@ func (w *FixedLenByteArrayColumnChunkWriter)
WriteDictIndices(indices arrow.Arra
func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values
[]parquet.FixedLenByteArray, numNulls int64) {
w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values)
if w.pageStatistics != nil {
-
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values,
numNulls)
+ if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
+
w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls)
+ } else {
+
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values,
numNulls)
+ }
}
}
@@ -1641,7 +1646,11 @@ func (w *FixedLenByteArrayColumnChunkWriter)
writeValuesSpaced(spacedValues []pa
}
if w.pageStatistics != nil {
nulls := numValues - numRead
-
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+ if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
+
w.pageStatistics.(*metadata.Float16Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+ } else {
+
w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+ }
}
}
diff --git a/go/parquet/file/column_writer_types.gen.go.tmpl
b/go/parquet/file/column_writer_types.gen.go.tmpl
index 7df69b4a21..70bcfe679e 100644
--- a/go/parquet/file/column_writer_types.gen.go.tmpl
+++ b/go/parquet/file/column_writer_types.gen.go.tmpl
@@ -18,7 +18,7 @@ package file
import (
"fmt"
-
+
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/metadata"
"github.com/apache/arrow/go/v15/parquet/internal/encoding"
@@ -83,7 +83,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values
[]{{.name}}, defLevels, r
// writes a large number of values, the DataPage size can be much above the
limit.
// The purpose of this chunking is to bound this. Even if a user writes
large number
// of values, the chunking will ensure the AddDataPage() is called at a
reasonable
- // pagesize limit
+ // pagesize limit
var n int64
switch {
case defLevels != nil:
@@ -107,7 +107,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values
[]{{.name}}, defLevels, r
valueOffset += toWrite
w.checkDictionarySizeLimit()
})
- return
+ return
}
// WriteBatchSpaced writes a batch of repetition levels, definition levels,
and values to the
@@ -132,7 +132,7 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteBatchSpaced(values []{{.name}}, defLev
length = len(values)
}
doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64)
{
- var vals []{{.name}}
+ var vals []{{.name}}
info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset,
batch), batch)
w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch),
levelSliceOrNil(repLevels, offset, batch))
@@ -165,7 +165,7 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteDictIndices(indices arrow.Array, defLe
}
}
}()
-
+
valueOffset := int64(0)
length := len(defLevels)
if defLevels == nil {
@@ -193,14 +193,22 @@ func (w *{{.Name}}ColumnChunkWriter)
WriteDictIndices(indices arrow.Array, defLe
valueOffset += info.numSpaced()
})
-
+
return
}
func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls
int64) {
w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values)
if w.pageStatistics != nil {
+{{- if ne .Name "FixedLenByteArray"}}
w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
+{{- else}}
+ if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
+ w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls)
+ } else {
+ w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls)
+ }
+{{- end}}
}
}
@@ -212,7 +220,15 @@ func (w *{{.Name}}ColumnChunkWriter)
writeValuesSpaced(spacedValues []{{.name}},
}
if w.pageStatistics != nil {
nulls := numValues - numRead
+{{- if ne .Name "FixedLenByteArray"}}
w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+{{- else}}
+ if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) {
+
w.pageStatistics.(*metadata.Float16Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+ } else {
+
w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues,
validBits, validBitsOffset, nulls)
+ }
+{{- end}}
}
}
diff --git a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
index 1de0c8dee4..01f1eb5aa9 100644
--- a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
+++ b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go
@@ -1,4 +1,4 @@
-// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT.
package parquet
diff --git a/go/parquet/internal/gen-go/parquet/parquet-consts.go
b/go/parquet/internal/gen-go/parquet/parquet-consts.go
index d4a63b22b8..ab0a73c596 100644
--- a/go/parquet/internal/gen-go/parquet/parquet-consts.go
+++ b/go/parquet/internal/gen-go/parquet/parquet-consts.go
@@ -1,21 +1,28 @@
-// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT.
package parquet
import (
"bytes"
"context"
+ "errors"
"fmt"
"time"
thrift "github.com/apache/thrift/lib/go/thrift"
+ "strings"
+ "regexp"
)
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
+var _ = errors.New
var _ = context.Background
var _ = time.Now
var _ = bytes.Equal
+// (needed by validator.)
+var _ = strings.Contains
+var _ = regexp.MatchString
func init() {
diff --git a/go/parquet/internal/gen-go/parquet/parquet.go
b/go/parquet/internal/gen-go/parquet/parquet.go
index d4508f8e45..9dcedae888 100644
--- a/go/parquet/internal/gen-go/parquet/parquet.go
+++ b/go/parquet/internal/gen-go/parquet/parquet.go
@@ -1,4 +1,4 @@
-// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT.
+// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT.
package parquet
@@ -10,14 +10,20 @@ import (
"fmt"
"time"
thrift "github.com/apache/thrift/lib/go/thrift"
+ "strings"
+ "regexp"
)
// (needed to ensure safety because of naive import list construction.)
var _ = thrift.ZERO
var _ = fmt.Printf
+var _ = errors.New
var _ = context.Background
var _ = time.Now
var _ = bytes.Equal
+// (needed by validator.)
+var _ = strings.Contains
+var _ = regexp.MatchString
//Types supported by Parquet. These types are intended to be used in
combination
//with the encodings to control the on disk storage format.
@@ -94,9 +100,10 @@ func (p * Type) Value() (driver.Value, error) {
}
return int64(*p), nil
}
-//Common types used by frameworks(e.g. hive, pig) using parquet. This helps
map
-//between types in those frameworks to the base types in parquet. This is only
-//metadata and not needed to read or write the data.
+//DEPRECATED: Common types used by frameworks(e.g. hive, pig) using parquet.
+//ConvertedType is superseded by LogicalType. This enum should not be
extended.
+//
+//See LogicalTypes.md for conversion between ConvertedType and LogicalType.
type ConvertedType int64
const (
ConvertedType_UTF8 ConvertedType = 0
@@ -897,6 +904,9 @@ func (p *Statistics) String() string {
return fmt.Sprintf("Statistics(%+v)", *p)
}
+func (p *Statistics) Validate() error {
+ return nil
+}
// Empty structs to use as logical type annotations
type StringType struct {
}
@@ -958,6 +968,9 @@ func (p *StringType) String() string {
return fmt.Sprintf("StringType(%+v)", *p)
}
+func (p *StringType) Validate() error {
+ return nil
+}
type UUIDType struct {
}
@@ -1018,6 +1031,9 @@ func (p *UUIDType) String() string {
return fmt.Sprintf("UUIDType(%+v)", *p)
}
+func (p *UUIDType) Validate() error {
+ return nil
+}
type MapType struct {
}
@@ -1078,6 +1094,9 @@ func (p *MapType) String() string {
return fmt.Sprintf("MapType(%+v)", *p)
}
+func (p *MapType) Validate() error {
+ return nil
+}
type ListType struct {
}
@@ -1138,6 +1157,9 @@ func (p *ListType) String() string {
return fmt.Sprintf("ListType(%+v)", *p)
}
+func (p *ListType) Validate() error {
+ return nil
+}
type EnumType struct {
}
@@ -1198,6 +1220,9 @@ func (p *EnumType) String() string {
return fmt.Sprintf("EnumType(%+v)", *p)
}
+func (p *EnumType) Validate() error {
+ return nil
+}
type DateType struct {
}
@@ -1258,6 +1283,72 @@ func (p *DateType) String() string {
return fmt.Sprintf("DateType(%+v)", *p)
}
+func (p *DateType) Validate() error {
+ return nil
+}
+type Float16Type struct {
+}
+
+func NewFloat16Type() *Float16Type {
+ return &Float16Type{}
+}
+
+func (p *Float16Type) Read(ctx context.Context, iprot thrift.TProtocol) error {
+ if _, err := iprot.ReadStructBegin(ctx); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
+ }
+
+
+ for {
+ _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin(ctx)
+ if err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p,
fieldId), err)
+ }
+ if fieldTypeId == thrift.STOP { break; }
+ if err := iprot.Skip(ctx, fieldTypeId); err != nil {
+ return err
+ }
+ if err := iprot.ReadFieldEnd(ctx); err != nil {
+ return err
+ }
+ }
+ if err := iprot.ReadStructEnd(ctx); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p),
err)
+ }
+ return nil
+}
+
+func (p *Float16Type) Write(ctx context.Context, oprot thrift.TProtocol) error
{
+ if err := oprot.WriteStructBegin(ctx, "Float16Type"); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ",
p), err) }
+ if p != nil {
+ }
+ if err := oprot.WriteFieldStop(ctx); err != nil {
+ return thrift.PrependError("write field stop error: ", err) }
+ if err := oprot.WriteStructEnd(ctx); err != nil {
+ return thrift.PrependError("write struct stop error: ", err) }
+ return nil
+}
+
+func (p *Float16Type) Equals(other *Float16Type) bool {
+ if p == other {
+ return true
+ } else if p == nil || other == nil {
+ return false
+ }
+ return true
+}
+
+func (p *Float16Type) String() string {
+ if p == nil {
+ return "<nil>"
+ }
+ return fmt.Sprintf("Float16Type(%+v)", *p)
+}
+
+func (p *Float16Type) Validate() error {
+ return nil
+}
// Logical type to annotate a column that is always null.
//
// Sometimes when discovering the schema of existing data, values are always
@@ -1323,6 +1414,9 @@ func (p *NullType) String() string {
return fmt.Sprintf("NullType(%+v)", *p)
}
+func (p *NullType) Validate() error {
+ return nil
+}
// Decimal logical type annotation
//
// To maintain forward-compatibility in v1, implementations using this logical
@@ -1478,6 +1572,9 @@ func (p *DecimalType) String() string {
return fmt.Sprintf("DecimalType(%+v)", *p)
}
+func (p *DecimalType) Validate() error {
+ return nil
+}
// Time units for logical types
type MilliSeconds struct {
}
@@ -1539,6 +1636,9 @@ func (p *MilliSeconds) String() string {
return fmt.Sprintf("MilliSeconds(%+v)", *p)
}
+func (p *MilliSeconds) Validate() error {
+ return nil
+}
type MicroSeconds struct {
}
@@ -1599,6 +1699,9 @@ func (p *MicroSeconds) String() string {
return fmt.Sprintf("MicroSeconds(%+v)", *p)
}
+func (p *MicroSeconds) Validate() error {
+ return nil
+}
type NanoSeconds struct {
}
@@ -1659,6 +1762,9 @@ func (p *NanoSeconds) String() string {
return fmt.Sprintf("NanoSeconds(%+v)", *p)
}
+func (p *NanoSeconds) Validate() error {
+ return nil
+}
// Attributes:
// - MILLIS
// - MICROS
@@ -1879,6 +1985,9 @@ func (p *TimeUnit) String() string {
return fmt.Sprintf("TimeUnit(%+v)", *p)
}
+func (p *TimeUnit) Validate() error {
+ return nil
+}
// Timestamp logical type annotation
//
// Allowed for physical types: INT64
@@ -2038,6 +2147,9 @@ func (p *TimestampType) String() string {
return fmt.Sprintf("TimestampType(%+v)", *p)
}
+func (p *TimestampType) Validate() error {
+ return nil
+}
// Time logical type annotation
//
// Allowed for physical types: INT32 (millis), INT64 (micros, nanos)
@@ -2197,6 +2309,9 @@ func (p *TimeType) String() string {
return fmt.Sprintf("TimeType(%+v)", *p)
}
+func (p *TimeType) Validate() error {
+ return nil
+}
// Integer logical type annotation
//
// bitWidth must be 8, 16, 32, or 64.
@@ -2352,6 +2467,9 @@ func (p *IntType) String() string {
return fmt.Sprintf("IntType(%+v)", *p)
}
+func (p *IntType) Validate() error {
+ return nil
+}
// Embedded JSON logical type annotation
//
// Allowed for physical types: BINARY
@@ -2415,6 +2533,9 @@ func (p *JsonType) String() string {
return fmt.Sprintf("JsonType(%+v)", *p)
}
+func (p *JsonType) Validate() error {
+ return nil
+}
// Embedded BSON logical type annotation
//
// Allowed for physical types: BINARY
@@ -2478,11 +2599,14 @@ func (p *BsonType) String() string {
return fmt.Sprintf("BsonType(%+v)", *p)
}
+func (p *BsonType) Validate() error {
+ return nil
+}
// LogicalType annotations to replace ConvertedType.
//
// To maintain compatibility, implementations using LogicalType for a
-// SchemaElement must also set the corresponding ConvertedType from the
-// following table.
+// SchemaElement must also set the corresponding ConvertedType (if any)
+// from the following table.
//
// Attributes:
// - STRING
@@ -2498,6 +2622,7 @@ func (p *BsonType) String() string {
// - JSON
// - BSON
// - UUID
+// - FLOAT16
type LogicalType struct {
STRING *StringType `thrift:"STRING,1" db:"STRING" json:"STRING,omitempty"`
MAP *MapType `thrift:"MAP,2" db:"MAP" json:"MAP,omitempty"`
@@ -2513,6 +2638,7 @@ type LogicalType struct {
JSON *JsonType `thrift:"JSON,12" db:"JSON" json:"JSON,omitempty"`
BSON *BsonType `thrift:"BSON,13" db:"BSON" json:"BSON,omitempty"`
UUID *UUIDType `thrift:"UUID,14" db:"UUID" json:"UUID,omitempty"`
+ FLOAT16 *Float16Type `thrift:"FLOAT16,15" db:"FLOAT16"
json:"FLOAT16,omitempty"`
}
func NewLogicalType() *LogicalType {
@@ -2610,6 +2736,13 @@ func (p *LogicalType) GetUUID() *UUIDType {
}
return p.UUID
}
+var LogicalType_FLOAT16_DEFAULT *Float16Type
+func (p *LogicalType) GetFLOAT16() *Float16Type {
+ if !p.IsSetFLOAT16() {
+ return LogicalType_FLOAT16_DEFAULT
+ }
+return p.FLOAT16
+}
func (p *LogicalType) CountSetFieldsLogicalType() int {
count := 0
if (p.IsSetSTRING()) {
@@ -2651,6 +2784,9 @@ func (p *LogicalType) CountSetFieldsLogicalType() int {
if (p.IsSetUUID()) {
count++
}
+ if (p.IsSetFLOAT16()) {
+ count++
+ }
return count
}
@@ -2707,6 +2843,10 @@ func (p *LogicalType) IsSetUUID() bool {
return p.UUID != nil
}
+func (p *LogicalType) IsSetFLOAT16() bool {
+ return p.FLOAT16 != nil
+}
+
func (p *LogicalType) Read(ctx context.Context, iprot thrift.TProtocol) error {
if _, err := iprot.ReadStructBegin(ctx); err != nil {
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -2850,6 +2990,16 @@ func (p *LogicalType) Read(ctx context.Context, iprot
thrift.TProtocol) error {
return err
}
}
+ case 15:
+ if fieldTypeId == thrift.STRUCT {
+ if err := p.ReadField15(ctx, iprot); err != nil {
+ return err
+ }
+ } else {
+ if err := iprot.Skip(ctx, fieldTypeId); err != nil {
+ return err
+ }
+ }
default:
if err := iprot.Skip(ctx, fieldTypeId); err != nil {
return err
@@ -2969,6 +3119,14 @@ func (p *LogicalType) ReadField14(ctx context.Context,
iprot thrift.TProtocol)
return nil
}
+func (p *LogicalType) ReadField15(ctx context.Context, iprot
thrift.TProtocol) error {
+ p.FLOAT16 = &Float16Type{}
+ if err := p.FLOAT16.Read(ctx, iprot); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T error reading struct: ",
p.FLOAT16), err)
+ }
+ return nil
+}
+
func (p *LogicalType) Write(ctx context.Context, oprot thrift.TProtocol) error
{
if c := p.CountSetFieldsLogicalType(); c != 1 {
return fmt.Errorf("%T write union: exactly one field must be set (%d
set)", p, c)
@@ -2989,6 +3147,7 @@ func (p *LogicalType) Write(ctx context.Context, oprot
thrift.TProtocol) error {
if err := p.writeField12(ctx, oprot); err != nil { return err }
if err := p.writeField13(ctx, oprot); err != nil { return err }
if err := p.writeField14(ctx, oprot); err != nil { return err }
+ if err := p.writeField15(ctx, oprot); err != nil { return err }
}
if err := oprot.WriteFieldStop(ctx); err != nil {
return thrift.PrependError("write field stop error: ", err) }
@@ -3166,6 +3325,19 @@ func (p *LogicalType) writeField14(ctx context.Context,
oprot thrift.TProtocol)
return err
}
+func (p *LogicalType) writeField15(ctx context.Context, oprot
thrift.TProtocol) (err error) {
+ if p.IsSetFLOAT16() {
+ if err := oprot.WriteFieldBegin(ctx, "FLOAT16", thrift.STRUCT, 15); err !=
nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field begin error
15:FLOAT16: ", p), err) }
+ if err := p.FLOAT16.Write(ctx, oprot); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T error writing struct: ",
p.FLOAT16), err)
+ }
+ if err := oprot.WriteFieldEnd(ctx); err != nil {
+ return thrift.PrependError(fmt.Sprintf("%T write field end error
15:FLOAT16: ", p), err) }
+ }
+ return err
+}
+
func (p *LogicalType) Equals(other *LogicalType) bool {
if p == other {
return true
@@ -3185,6 +3357,7 @@ func (p *LogicalType) Equals(other *LogicalType) bool {
if !p.JSON.Equals(other.JSON) { return false }
if !p.BSON.Equals(other.BSON) { return false }
if !p.UUID.Equals(other.UUID) { return false }
+ if !p.FLOAT16.Equals(other.FLOAT16) { return false }
return true
}
@@ -3195,6 +3368,9 @@ func (p *LogicalType) String() string {
return fmt.Sprintf("LogicalType(%+v)", *p)
}
+func (p *LogicalType) Validate() error {
+ return nil
+}
// Represents a element inside a schema definition.
// - if it is a group (inner node) then type is undefined and num_children is
defined
// - if it is a primitive type (leaf) then type is defined and num_children
is undefined
@@ -3202,7 +3378,7 @@ func (p *LogicalType) String() string {
//
// Attributes:
// - Type: Data type for this field. Not set if the current element is a
non-leaf node
-// - TypeLength: If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of
the vales.
+// - TypeLength: If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of
the values.
// Otherwise, if specified, this is the maximum bit length to store any of the
values.
// (e.g. a low cardinality INT col could have this set to 3). Note that this
is
// in the schema, and therefore fixed for the entire file.
@@ -3213,10 +3389,14 @@ func (p *LogicalType) String() string {
// the nesting is flattened to a single list by a depth-first traversal.
// The children count is used to construct the nested relationship.
// This field is not set when the element is a primitive type
-// - ConvertedType: When the schema is the result of a conversion from
another model
+// - ConvertedType: DEPRECATED: When the schema is the result of a conversion
from another model.
// Used to record the original type to help with cross conversion.
-// - Scale: Used when this column contains decimal data.
+//
+// This is superseded by logicalType.
+// - Scale: DEPRECATED: Used when this column contains decimal data.
// See the DECIMAL converted type for more details.
+//
+// This is superseded by using the DecimalType annotation in logicalType.
// - Precision
// - FieldID: When the original schema supports field ids, this will save the
// original field id in the parquet schema
@@ -3776,6 +3956,9 @@ func (p *SchemaElement) String() string {
return fmt.Sprintf("SchemaElement(%+v)", *p)
}
+func (p *SchemaElement) Validate() error {
+ return nil
+}
// Data page header
//
// Attributes:
@@ -4059,6 +4242,9 @@ func (p *DataPageHeader) String() string {
return fmt.Sprintf("DataPageHeader(%+v)", *p)
}
+func (p *DataPageHeader) Validate() error {
+ return nil
+}
type IndexPageHeader struct {
}
@@ -4119,6 +4305,14 @@ func (p *IndexPageHeader) String() string {
return fmt.Sprintf("IndexPageHeader(%+v)", *p)
}
+func (p *IndexPageHeader) Validate() error {
+ return nil
+}
+// The dictionary page must be placed at the first position of the column chunk
+// if it is partly or completely dictionary encoded. At most one dictionary
page
+// can be placed in a column chunk.
+//
+//
// Attributes:
// - NumValues: Number of values in the dictionary *
// - Encoding: Encoding using this dictionary page *
@@ -4319,6 +4513,9 @@ func (p *DictionaryPageHeader) String() string {
return fmt.Sprintf("DictionaryPageHeader(%+v)", *p)
}
+func (p *DictionaryPageHeader) Validate() error {
+ return nil
+}
// New page format allowing reading levels without decompressing the data
// Repetition and definition levels are uncompressed
// The remaining section containing the data is compressed if is_compressed is
true
@@ -4738,6 +4935,9 @@ func (p *DataPageHeaderV2) String() string {
return fmt.Sprintf("DataPageHeaderV2(%+v)", *p)
}
+func (p *DataPageHeaderV2) Validate() error {
+ return nil
+}
// Block-based algorithm type annotation. *
type SplitBlockAlgorithm struct {
}
@@ -4799,6 +4999,9 @@ func (p *SplitBlockAlgorithm) String() string {
return fmt.Sprintf("SplitBlockAlgorithm(%+v)", *p)
}
+func (p *SplitBlockAlgorithm) Validate() error {
+ return nil
+}
// The algorithm used in Bloom filter. *
//
// Attributes:
@@ -4923,6 +5126,9 @@ func (p *BloomFilterAlgorithm) String() string {
return fmt.Sprintf("BloomFilterAlgorithm(%+v)", *p)
}
+func (p *BloomFilterAlgorithm) Validate() error {
+ return nil
+}
// Hash strategy type annotation. xxHash is an extremely fast
non-cryptographic hash
// algorithm. It uses 64 bits version of xxHash.
//
@@ -4986,6 +5192,9 @@ func (p *XxHash) String() string {
return fmt.Sprintf("XxHash(%+v)", *p)
}
+func (p *XxHash) Validate() error {
+ return nil
+}
// The hash function used in Bloom filter. This function takes the hash of a
column value
// using plain encoding.
//
@@ -5112,6 +5321,9 @@ func (p *BloomFilterHash) String() string {
return fmt.Sprintf("BloomFilterHash(%+v)", *p)
}
+func (p *BloomFilterHash) Validate() error {
+ return nil
+}
// The compression used in the Bloom filter.
//
type Uncompressed struct {
@@ -5174,6 +5386,9 @@ func (p *Uncompressed) String() string {
return fmt.Sprintf("Uncompressed(%+v)", *p)
}
+func (p *Uncompressed) Validate() error {
+ return nil
+}
// Attributes:
// - UNCOMPRESSED
type BloomFilterCompression struct {
@@ -5296,6 +5511,9 @@ func (p *BloomFilterCompression) String() string {
return fmt.Sprintf("BloomFilterCompression(%+v)", *p)
}
+func (p *BloomFilterCompression) Validate() error {
+ return nil
+}
// Bloom filter header is stored at beginning of Bloom filter data of each
column
// and followed by its bitset.
//
@@ -5553,36 +5771,29 @@ func (p *BloomFilterHeader) String() string {
return fmt.Sprintf("BloomFilterHeader(%+v)", *p)
}
+func (p *BloomFilterHeader) Validate() error {
+ return nil
+}
// Attributes:
// - Type: the type of the page: indicates which of the *_header fields is
set *
// - UncompressedPageSize: Uncompressed page size in bytes (not including
this header) *
// - CompressedPageSize: Compressed (and potentially encrypted) page size in
bytes, not including this header *
-// - Crc: The 32bit CRC for the page, to be be calculated as follows:
-// - Using the standard CRC32 algorithm
-// - On the data only, i.e. this header should not be included. 'Data'
-// hereby refers to the concatenation of the repetition levels, the
-// definition levels and the column value, in this exact order.
-// - On the encoded versions of the repetition levels, definition levels and
-// column values
-// - On the compressed versions of the repetition levels, definition levels
-// and column values where possible;
-// - For v1 data pages, the repetition levels, definition levels and column
-// values are always compressed together. If a compression scheme is
-// specified, the CRC shall be calculated on the compressed version of
-// this concatenation. If no compression scheme is specified, the CRC
-// shall be calculated on the uncompressed version of this concatenation.
-// - For v2 data pages, the repetition levels and definition levels are
-// handled separately from the data and are never compressed (only
-// encoded). If a compression scheme is specified, the CRC shall be
-// calculated on the concatenation of the uncompressed repetition levels,
-// uncompressed definition levels and the compressed column values.
-// If no compression scheme is specified, the CRC shall be calculated on
-// the uncompressed concatenation.
-// - In encrypted columns, CRC is calculated after page encryption; the
-// encryption itself is performed after page compression (if compressed)
+// - Crc: The 32-bit CRC checksum for the page, to be be calculated as
follows:
+//
+// - The standard CRC32 algorithm is used (with polynomial 0x04C11DB7,
+// the same as in e.g. GZip).
+// - All page types can have a CRC (v1 and v2 data pages, dictionary pages,
+// etc.).
+// - The CRC is computed on the serialization binary representation of the page
+// (as written to disk), excluding the page header. For example, for v1
+// data pages, the CRC is computed on the concatenation of repetition levels,
+// definition levels and column values (optionally compressed, optionally
+// encrypted).
+// - The CRC computation therefore takes place after any compression
+// and encryption steps, if any.
+//
// If enabled, this allows for disabling checksumming in HDFS if only a few
// pages need to be read.
-//
// - DataPageHeader
// - IndexPageHeader
// - DictionaryPageHeader
@@ -6006,6 +6217,9 @@ func (p *PageHeader) String() string {
return fmt.Sprintf("PageHeader(%+v)", *p)
}
+func (p *PageHeader) Validate() error {
+ return nil
+}
// Wrapper struct to store key values
//
// Attributes:
@@ -6165,6 +6379,9 @@ func (p *KeyValue) String() string {
return fmt.Sprintf("KeyValue(%+v)", *p)
}
+func (p *KeyValue) Validate() error {
+ return nil
+}
// Wrapper struct to specify sort order
//
// Attributes:
@@ -6358,6 +6575,9 @@ func (p *SortingColumn) String() string {
return fmt.Sprintf("SortingColumn(%+v)", *p)
}
+func (p *SortingColumn) Validate() error {
+ return nil
+}
// statistics of a given page type and encoding
//
// Attributes:
@@ -6552,6 +6772,9 @@ func (p *PageEncodingStats) String() string {
return fmt.Sprintf("PageEncodingStats(%+v)", *p)
}
+func (p *PageEncodingStats) Validate() error {
+ return nil
+}
// Description for column metadata
//
// Attributes:
@@ -7346,6 +7569,9 @@ func (p *ColumnMetaData) String() string {
return fmt.Sprintf("ColumnMetaData(%+v)", *p)
}
+func (p *ColumnMetaData) Validate() error {
+ return nil
+}
type EncryptionWithFooterKey struct {
}
@@ -7406,6 +7632,9 @@ func (p *EncryptionWithFooterKey) String() string {
return fmt.Sprintf("EncryptionWithFooterKey(%+v)", *p)
}
+func (p *EncryptionWithFooterKey) Validate() error {
+ return nil
+}
// Attributes:
// - PathInSchema: Column path in schema *
// - KeyMetadata: Retrieval metadata of column encryption key *
@@ -7581,6 +7810,9 @@ func (p *EncryptionWithColumnKey) String() string {
return fmt.Sprintf("EncryptionWithColumnKey(%+v)", *p)
}
+func (p *EncryptionWithColumnKey) Validate() error {
+ return nil
+}
// Attributes:
// - ENCRYPTION_WITH_FOOTER_KEY
// - ENCRYPTION_WITH_COLUMN_KEY
@@ -7752,6 +7984,9 @@ func (p *ColumnCryptoMetaData) String() string {
return fmt.Sprintf("ColumnCryptoMetaData(%+v)", *p)
}
+func (p *ColumnCryptoMetaData) Validate() error {
+ return nil
+}
// Attributes:
// - FilePath: File where column data is stored. If not set, assumed to be
same file as
// metadata. This path is relative to the current file.
@@ -8254,6 +8489,9 @@ func (p *ColumnChunk) String() string {
return fmt.Sprintf("ColumnChunk(%+v)", *p)
}
+func (p *ColumnChunk) Validate() error {
+ return nil
+}
// Attributes:
// - Columns: Metadata for each column chunk in this row group.
// This list must have the same order as the SchemaElement list in
FileMetaData.
@@ -8694,6 +8932,9 @@ func (p *RowGroup) String() string {
return fmt.Sprintf("RowGroup(%+v)", *p)
}
+func (p *RowGroup) Validate() error {
+ return nil
+}
// Empty struct to signal the order defined by the physical or logical type
type TypeDefinedOrder struct {
}
@@ -8755,6 +8996,9 @@ func (p *TypeDefinedOrder) String() string {
return fmt.Sprintf("TypeDefinedOrder(%+v)", *p)
}
+func (p *TypeDefinedOrder) Validate() error {
+ return nil
+}
// Union to specify the order used for the min_value and max_value fields for a
// column. This union takes the role of an enhanced enum that allows rich
// elements (which will be needed for a collation-based ordering in the
future).
@@ -8808,6 +9052,13 @@ func (p *TypeDefinedOrder) String() string {
// - If the min is +0, the row group may contain -0 values as well.
// - If the max is -0, the row group may contain +0 values as well.
// - When looking for NaN values, min and max should be ignored.
+//
+// When writing statistics the following rules should be followed:
+// - NaNs should not be written to min or max statistics fields.
+// - If the computed max value is zero (whether negative or positive),
+// `+0.0` should be written into the max statistics field.
+// - If the computed min value is zero (whether negative or positive),
+// `-0.0` should be written into the min statistics field.
type ColumnOrder struct {
TYPE_ORDER *TypeDefinedOrder `thrift:"TYPE_ORDER,1" db:"TYPE_ORDER"
json:"TYPE_ORDER,omitempty"`
}
@@ -8928,6 +9179,9 @@ func (p *ColumnOrder) String() string {
return fmt.Sprintf("ColumnOrder(%+v)", *p)
}
+func (p *ColumnOrder) Validate() error {
+ return nil
+}
// Attributes:
// - Offset: Offset of the page in the file *
// - CompressedPageSize: Size of the page, including header. Sum of
compressed_page_size and header
@@ -9120,6 +9374,9 @@ func (p *PageLocation) String() string {
return fmt.Sprintf("PageLocation(%+v)", *p)
}
+func (p *PageLocation) Validate() error {
+ return nil
+}
// Attributes:
// - PageLocations: PageLocations, ordered by increasing PageLocation.offset.
It is required
// that page_locations[i].first_row_index <
page_locations[i+1].first_row_index.
@@ -9251,6 +9508,9 @@ func (p *OffsetIndex) String() string {
return fmt.Sprintf("OffsetIndex(%+v)", *p)
}
+func (p *OffsetIndex) Validate() error {
+ return nil
+}
// Description for ColumnIndex.
// Each <array-field>[i] refers to the page at OffsetIndex.page_locations[i]
//
@@ -9260,15 +9520,16 @@ func (p *OffsetIndex) String() string {
// have to set the corresponding entries in min_values and max_values to
// byte[0], so that all lists have the same length. If false, the
// corresponding entries in min_values and max_values must be valid.
-// - MinValues: Two lists containing lower and upper bounds for the values of
each page.
-// These may be the actual minimum and maximum values found on a page, but
-// can also be (more compact) values that do not exist on a page. For
-// example, instead of storing ""Blart Versenwald III", a writer may set
-// min_values[i]="B", max_values[i]="C". Such more compact values must still
-// be valid values within the column's logical type. Readers must make sure
-// that list entries are populated before using them by inspecting null_pages.
+// - MinValues: Two lists containing lower and upper bounds for the values of
each page
+// determined by the ColumnOrder of the column. These may be the actual
+// minimum and maximum values found on a page, but can also be (more compact)
+// values that do not exist on a page. For example, instead of storing ""Blart
+// Versenwald III", a writer may set min_values[i]="B", max_values[i]="C".
+// Such more compact values must still be valid values within the column's
+// logical type. Readers must make sure that list entries are populated before
+// using them by inspecting null_pages.
// - MaxValues
-// - BoundaryOrder: Stores whether both min_values and max_values are orderd
and if so, in
+// - BoundaryOrder: Stores whether both min_values and max_values are ordered
and if so, in
// which direction. This allows readers to perform binary searches in both
// lists. Readers cannot assume that max_values[i] <= min_values[i+1], even
// if the lists are ordered.
@@ -9644,6 +9905,9 @@ func (p *ColumnIndex) String() string {
return fmt.Sprintf("ColumnIndex(%+v)", *p)
}
+func (p *ColumnIndex) Validate() error {
+ return nil
+}
// Attributes:
// - AadPrefix: AAD prefix *
// - AadFileUnique: Unique file identifier part of AAD suffix *
@@ -9848,6 +10112,9 @@ func (p *AesGcmV1) String() string {
return fmt.Sprintf("AesGcmV1(%+v)", *p)
}
+func (p *AesGcmV1) Validate() error {
+ return nil
+}
// Attributes:
// - AadPrefix: AAD prefix *
// - AadFileUnique: Unique file identifier part of AAD suffix *
@@ -10052,6 +10319,9 @@ func (p *AesGcmCtrV1) String() string {
return fmt.Sprintf("AesGcmCtrV1(%+v)", *p)
}
+func (p *AesGcmCtrV1) Validate() error {
+ return nil
+}
// Attributes:
// - AES_GCM_V1
// - AES_GCM_CTR_V1
@@ -10223,6 +10493,9 @@ func (p *EncryptionAlgorithm) String() string {
return fmt.Sprintf("EncryptionAlgorithm(%+v)", *p)
}
+func (p *EncryptionAlgorithm) Validate() error {
+ return nil
+}
// Description for file metadata
//
// Attributes:
@@ -10240,17 +10513,20 @@ func (p *EncryptionAlgorithm) String() string {
// <Application> version <App Version> (build <App Build Hash>).
// e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55)
//
-// - ColumnOrders: Sort order used for the min_value and max_value fields of
each column in
-// this file. Sort orders are listed in the order matching the columns in the
-// schema. The indexes are not necessary the same though, because only leaf
-// nodes of the schema are represented in the list of sort orders.
+// - ColumnOrders: Sort order used for the min_value and max_value fields in
the Statistics
+// objects and the min_values and max_values fields in the ColumnIndex
+// objects of each column in this file. Sort orders are listed in the order
+// matching the columns in the schema. The indexes are not necessary the same
+// though, because only leaf nodes of the schema are represented in the list
+// of sort orders.
//
-// Without column_orders, the meaning of the min_value and max_value fields is
-// undefined. To ensure well-defined behaviour, if min_value and max_value are
-// written to a Parquet file, column_orders must be written as well.
+// Without column_orders, the meaning of the min_value and max_value fields
+// in the Statistics object and the ColumnIndex object is undefined. To ensure
+// well-defined behaviour, if these fields are written to a Parquet file,
+// column_orders must be written as well.
//
-// The obsolete min and max fields are always sorted by signed comparison
-// regardless of column_orders.
+// The obsolete min and max fields in the Statistics object are always sorted
+// by signed comparison regardless of column_orders.
// - EncryptionAlgorithm: Encryption algorithm. This field is set only in
encrypted files
// with plaintext footer. Files with encrypted footer store algorithm id
// in FileCryptoMetaData structure.
@@ -10803,6 +11079,9 @@ func (p *FileMetaData) String() string {
return fmt.Sprintf("FileMetaData(%+v)", *p)
}
+func (p *FileMetaData) Validate() error {
+ return nil
+}
// Crypto metadata for files with encrypted footer *
//
// Attributes:
@@ -10965,3 +11244,6 @@ func (p *FileCryptoMetaData) String() string {
return fmt.Sprintf("FileCryptoMetaData(%+v)", *p)
}
+func (p *FileCryptoMetaData) Validate() error {
+ return nil
+}
diff --git a/go/parquet/internal/testutils/random.go
b/go/parquet/internal/testutils/random.go
index bb101ebf9a..bb9ee0cdf2 100644
--- a/go/parquet/internal/testutils/random.go
+++ b/go/parquet/internal/testutils/random.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/arrow/go/v15/arrow/array"
"github.com/apache/arrow/go/v15/arrow/bitutil"
"github.com/apache/arrow/go/v15/arrow/endian"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/pqarrow"
@@ -369,6 +370,17 @@ func randFloat64(r *rand.Rand) float64 {
}
}
+// randFloat16 creates a random float value with a normal distribution
+// to better spread the values out and ensure we do not return any NaN or Inf
values.
+func randFloat16(r *rand.Rand) float16.Num {
+ for {
+ f := float16.FromBits(uint16(r.Uint64n(math.MaxUint16 + 1)))
+ if !f.IsNaN() {
+ return f
+ }
+ }
+}
+
// FillRandomFloat32 populates out with random float32 values using seed as
the random
// seed for the generator to allow consistency for testing.
func FillRandomFloat32(seed uint64, out []float32) {
@@ -387,6 +399,15 @@ func FillRandomFloat64(seed uint64, out []float64) {
}
}
+// FillRandomFloat16 populates out with random float64 values using seed as
the random
+// seed for the generator to allow consistency for testing.
+func FillRandomFloat16(seed uint64, out []float16.Num) {
+ r := rand.New(rand.NewSource(seed))
+ for idx := range out {
+ out[idx] = randFloat16(r)
+ }
+}
+
// FillRandomByteArray populates out with random ByteArray values with lengths
between 2 and 12
// using heap as the actual memory storage used for the bytes generated. Each
element of
// out will be some slice of the bytes in heap, and as such heap must outlive
the byte array slices.
@@ -456,6 +477,8 @@ func InitValues(values interface{}, heap *memory.Buffer) {
FillRandomFloat32(0, arr)
case []float64:
FillRandomFloat64(0, arr)
+ case []float16.Num:
+ FillRandomFloat16(0, arr)
case []parquet.Int96:
FillRandomInt96(0, arr)
case []parquet.ByteArray:
diff --git a/go/parquet/internal/testutils/random_arrow.go
b/go/parquet/internal/testutils/random_arrow.go
index d886db0360..7dd2a3e8b7 100644
--- a/go/parquet/internal/testutils/random_arrow.go
+++ b/go/parquet/internal/testutils/random_arrow.go
@@ -19,6 +19,7 @@ package testutils
import (
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/arrow/memory"
"golang.org/x/exp/rand"
)
@@ -49,6 +50,13 @@ func RandomNonNull(mem memory.Allocator, dt arrow.DataType,
size int) arrow.Arra
FillRandomFloat64(0, values)
bldr.AppendValues(values, nil)
return bldr.NewArray()
+ case arrow.FLOAT16:
+ bldr := array.NewFloat16Builder(mem)
+ defer bldr.Release()
+ values := make([]float16.Num, size)
+ FillRandomFloat16(0, values)
+ bldr.AppendValues(values, nil)
+ return bldr.NewArray()
case arrow.INT64:
bldr := array.NewInt64Builder(mem)
defer bldr.Release()
@@ -212,6 +220,21 @@ func RandomNullable(dt arrow.DataType, size int, numNulls
int) arrow.Array {
values := make([]float64, size)
FillRandomFloat64(0, values)
+ valid := make([]bool, size)
+ for idx := range valid {
+ valid[idx] = true
+ }
+ for i := 0; i < numNulls; i++ {
+ valid[i*2] = false
+ }
+ bldr.AppendValues(values, valid)
+ return bldr.NewArray()
+ case arrow.FLOAT16:
+ bldr := array.NewFloat16Builder(memory.DefaultAllocator)
+ defer bldr.Release()
+ values := make([]float16.Num, size)
+ FillRandomFloat16(0, values)
+
valid := make([]bool, size)
for idx := range valid {
valid[idx] = true
diff --git a/go/parquet/metadata/statistics.go
b/go/parquet/metadata/statistics.go
index 606e76ffd0..43294272de 100644
--- a/go/parquet/metadata/statistics.go
+++ b/go/parquet/metadata/statistics.go
@@ -23,6 +23,7 @@ import (
"unsafe"
"github.com/apache/arrow/go/v15/arrow"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/internal/utils"
"github.com/apache/arrow/go/v15/parquet"
@@ -32,7 +33,7 @@ import (
"github.com/apache/arrow/go/v15/parquet/schema"
)
-//go:generate go run ../../arrow/_tools/tmpl/main.go -i
-data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl
+//go:generate go run ../../arrow/_tools/tmpl/main.go -i
-data=statistics_types.tmpldata statistics_types.gen.go.tmpl
type StatProvider interface {
GetMin() []byte
@@ -373,6 +374,9 @@ var (
defaultMinUInt96 parquet.Int96
defaultMaxInt96 parquet.Int96
defaultMaxUInt96 parquet.Int96
+
+ defaultMinFloat16 parquet.FixedLenByteArray = float16.MaxNum.ToLEBytes()
+ defaultMaxFloat16 parquet.FixedLenByteArray = float16.MinNum.ToLEBytes()
)
func init() {
@@ -407,6 +411,14 @@ func (s *Int96Statistics) defaultMax() parquet.Int96 {
return defaultMaxInt96
}
+func (Float16Statistics) defaultMin() parquet.FixedLenByteArray {
+ return defaultMinFloat16
+}
+
+func (Float16Statistics) defaultMax() parquet.FixedLenByteArray {
+ return defaultMaxFloat16
+}
+
func (Float32Statistics) defaultMin() float32 {
return math.MaxFloat32 }
func (Float32Statistics) defaultMax() float32 {
return -math.MaxFloat32 }
func (Float64Statistics) defaultMin() float64 {
return math.MaxFloat64 }
@@ -427,6 +439,10 @@ func (FixedLenByteArrayStatistics) equal(a, b
parquet.FixedLenByteArray) bool {
return bytes.Equal(a, b)
}
+func (Float16Statistics) equal(a, b parquet.FixedLenByteArray) bool {
+ return float16.FromLEBytes(a).Equal(float16.FromLEBytes(b))
+}
+
func (BooleanStatistics) less(a, b bool) bool {
return !a && b
}
@@ -481,6 +497,10 @@ func (s *FixedLenByteArrayStatistics) less(a, b
parquet.FixedLenByteArray) bool
return signedByteLess([]byte(a), []byte(b))
}
+func (Float16Statistics) less(a, b parquet.FixedLenByteArray) bool {
+ return float16.FromLEBytes(a).Less(float16.FromLEBytes(b))
+}
+
func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean)
*minmaxPairBoolean { return &minMax }
func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32
{ return &minMax }
func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64
{ return &minMax }
@@ -535,6 +555,29 @@ func (Float64Statistics) cleanStat(minMax
minmaxPairFloat64) *minmaxPairFloat64
return &minMax
}
+func (Float16Statistics) cleanStat(minMax minmaxPairFloat16)
*minmaxPairFloat16 {
+ min := float16.FromLEBytes(minMax[0][:])
+ max := float16.FromLEBytes(minMax[1][:])
+
+ if min.IsNaN() || max.IsNaN() {
+ return nil
+ }
+
+ if min.Equal(float16.MaxNum) && max.Equal(float16.MinNum) {
+ return nil
+ }
+
+ zero := float16.New(0)
+ if min.Equal(zero) && !min.Signbit() {
+ minMax[0] = min.Negate().ToLEBytes()
+ }
+ if max.Equal(zero) && max.Signbit() {
+ minMax[1] = max.Negate().ToLEBytes()
+ }
+
+ return &minMax
+}
+
func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray)
*minmaxPairByteArray {
if minMax[0] == nil || minMax[1] == nil {
return nil
diff --git a/go/parquet/metadata/statistics_test.go
b/go/parquet/metadata/statistics_test.go
index 47798d3c4d..19311dc895 100644
--- a/go/parquet/metadata/statistics_test.go
+++ b/go/parquet/metadata/statistics_test.go
@@ -22,6 +22,7 @@ import (
"testing"
"github.com/apache/arrow/go/v15/arrow/bitutil"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/metadata"
@@ -32,24 +33,36 @@ import (
// NOTE(zeroshade): tests will be added and updated after merging the "file"
package
// since the tests that I wrote relied on the file writer/reader for ease of
use.
+func newFloat16Node(name string, rep parquet.Repetition, fieldID int32)
*schema.PrimitiveNode {
+ return schema.MustPrimitive(schema.NewPrimitiveNodeLogical(name, rep,
schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID))
+}
+
func TestCheckNaNs(t *testing.T) {
const (
numvals = 8
min = -4.0
max = 3.0
)
- nan := math.NaN()
+ var (
+ nan = math.NaN()
+ f16Min parquet.FixedLenByteArray =
float16.New(float32(min)).ToLEBytes()
+ f16Max parquet.FixedLenByteArray =
float16.New(float32(max)).ToLEBytes()
+ )
allNans := []float64{nan, nan, nan, nan, nan, nan, nan, nan}
allNansf32 := make([]float32, numvals)
+ allNansf16 := make([]parquet.FixedLenByteArray, numvals)
for idx, v := range allNans {
allNansf32[idx] = float32(v)
+ allNansf16[idx] = float16.New(float32(v)).ToLEBytes()
}
someNans := []float64{nan, max, -3.0, -1.0, nan, 2.0, min, nan}
someNansf32 := make([]float32, numvals)
+ someNansf16 := make([]parquet.FixedLenByteArray, numvals)
for idx, v := range someNans {
someNansf32[idx] = float32(v)
+ someNansf16[idx] = float16.New(float32(v)).ToLEBytes()
}
validBitmap := []byte{0x7F} // 0b01111111
@@ -62,6 +75,8 @@ func TestCheckNaNs(t *testing.T) {
s.Update(values.([]float32), 0)
case *metadata.Float64Statistics:
s.Update(values.([]float64), 0)
+ case *metadata.Float16Statistics:
+ s.Update(values.([]parquet.FixedLenByteArray),
0)
}
assert.False(t, stats.HasMinMax())
} else {
@@ -72,6 +87,8 @@ func TestCheckNaNs(t *testing.T) {
s.UpdateSpaced(values.([]float32), bitmap, 0,
int64(nullCount))
case *metadata.Float64Statistics:
s.UpdateSpaced(values.([]float64), bitmap, 0,
int64(nullCount))
+ case *metadata.Float16Statistics:
+
s.UpdateSpaced(values.([]parquet.FixedLenByteArray), bitmap, 0,
int64(nullCount))
}
assert.False(t, stats.HasMinMax())
}
@@ -89,6 +106,11 @@ func TestCheckNaNs(t *testing.T) {
assert.True(t, stats.HasMinMax())
assert.Equal(t, expectedMin, s.Min())
assert.Equal(t, expectedMax, s.Max())
+ case *metadata.Float16Statistics:
+ s.Update(values.([]parquet.FixedLenByteArray), 0)
+ assert.True(t, stats.HasMinMax())
+ assert.Equal(t, expectedMin, s.Min())
+ assert.Equal(t, expectedMax, s.Max())
}
}
@@ -106,34 +128,48 @@ func TestCheckNaNs(t *testing.T) {
assert.True(t, s.HasMinMax())
assert.Equal(t, expectedMin, s.Min())
assert.Equal(t, expectedMax, s.Max())
+ case *metadata.Float16Statistics:
+ s.UpdateSpaced(values.([]parquet.FixedLenByteArray),
bitmap, 0, int64(nullCount))
+ assert.True(t, s.HasMinMax())
+ assert.Equal(t, expectedMin, s.Min())
+ assert.Equal(t, expectedMax, s.Max())
}
}
f32Col := schema.NewColumn(schema.NewFloat32Node("f",
parquet.Repetitions.Optional, -1), 1, 1)
f64Col := schema.NewColumn(schema.NewFloat64Node("f",
parquet.Repetitions.Optional, -1), 1, 1)
+ f16Col := schema.NewColumn(newFloat16Node("f",
parquet.Repetitions.Required, -1), 1, 1)
// test values
someNanStats := metadata.NewStatistics(f64Col, memory.DefaultAllocator)
someNanStatsf32 := metadata.NewStatistics(f32Col,
memory.DefaultAllocator)
+ someNanStatsf16 := metadata.NewStatistics(f16Col,
memory.DefaultAllocator)
// ingesting only nans should not yield a min or max
assertUnsetMinMax(someNanStats, allNans, nil)
assertUnsetMinMax(someNanStatsf32, allNansf32, nil)
+ assertUnsetMinMax(someNanStatsf16, allNansf16, nil)
// ingesting a mix should yield a valid min/max
assertMinMaxAre(someNanStats, someNans, min, max)
assertMinMaxAre(someNanStatsf32, someNansf32, float32(min),
float32(max))
+ assertMinMaxAre(someNanStatsf16, someNansf16, f16Min, f16Max)
// ingesting only nans after a valid min/max should have no effect
assertMinMaxAre(someNanStats, allNans, min, max)
assertMinMaxAre(someNanStatsf32, allNansf32, float32(min), float32(max))
+ assertMinMaxAre(someNanStatsf16, allNansf16, f16Min, f16Max)
someNanStats = metadata.NewStatistics(f64Col, memory.DefaultAllocator)
someNanStatsf32 = metadata.NewStatistics(f32Col,
memory.DefaultAllocator)
+ someNanStatsf16 = metadata.NewStatistics(f16Col,
memory.DefaultAllocator)
assertUnsetMinMax(someNanStats, allNans, validBitmap)
assertUnsetMinMax(someNanStatsf32, allNansf32, validBitmap)
+ assertUnsetMinMax(someNanStatsf16, allNansf16, validBitmap)
// nans should not pollute min/max when excluded via null bitmap
assertMinMaxAreSpaced(someNanStats, someNans, validBitmapNoNaNs, min,
max)
assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmapNoNaNs,
float32(min), float32(max))
+ assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmapNoNaNs,
f16Min, f16Max)
// ingesting nans with a null bitmap should not change the result
assertMinMaxAreSpaced(someNanStats, someNans, validBitmap, min, max)
assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmap,
float32(min), float32(max))
+ assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmap,
f16Min, f16Max)
}
func TestCheckNegativeZeroStats(t *testing.T) {
@@ -155,37 +191,61 @@ func TestCheckNegativeZeroStats(t *testing.T) {
assert.True(t, math.Signbit(s.Min()))
assert.Equal(t, zero, s.Max())
assert.False(t, math.Signbit(s.Max()))
+ case *metadata.Float16Statistics:
+ s.Update(values.([]parquet.FixedLenByteArray), 0)
+ assert.True(t, s.HasMinMax())
+ var zero float64
+ min := float64(float16.FromLEBytes(s.Min()).Float32())
+ max := float64(float16.FromLEBytes(s.Max()).Float32())
+ assert.Equal(t, zero, min)
+ assert.True(t, math.Signbit(min))
+ assert.Equal(t, zero, max)
+ assert.False(t, math.Signbit(max))
}
}
fcol := schema.NewColumn(schema.NewFloat32Node("f",
parquet.Repetitions.Optional, -1), 1, 1)
dcol := schema.NewColumn(schema.NewFloat64Node("d",
parquet.Repetitions.Optional, -1), 1, 1)
+ hcol := schema.NewColumn(newFloat16Node("h",
parquet.Repetitions.Optional, -1), 1, 1)
var f32zero float32
var f64zero float64
+ var f16PosZero parquet.FixedLenByteArray =
float16.New(+f32zero).ToLEBytes()
+ var f16NegZero parquet.FixedLenByteArray =
float16.New(-f32zero).ToLEBytes()
+
+ assert.False(t, float16.FromLEBytes(f16PosZero).Signbit())
+ assert.True(t, float16.FromLEBytes(f16NegZero).Signbit())
{
fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator)
dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator)
+ hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator)
assertMinMaxZeroesSign(fstats, []float32{-f32zero, f32zero})
assertMinMaxZeroesSign(dstats, []float64{-f64zero, f64zero})
+ assertMinMaxZeroesSign(hstats,
[]parquet.FixedLenByteArray{f16NegZero, f16PosZero})
}
{
fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator)
dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator)
+ hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator)
assertMinMaxZeroesSign(fstats, []float32{f32zero, -f32zero})
assertMinMaxZeroesSign(dstats, []float64{f64zero, -f64zero})
+ assertMinMaxZeroesSign(hstats,
[]parquet.FixedLenByteArray{f16PosZero, f16NegZero})
}
{
fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator)
dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator)
+ hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator)
assertMinMaxZeroesSign(fstats, []float32{-f32zero, -f32zero})
assertMinMaxZeroesSign(dstats, []float64{-f64zero, -f64zero})
+ assertMinMaxZeroesSign(hstats,
[]parquet.FixedLenByteArray{f16NegZero, f16NegZero})
}
{
fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator)
dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator)
+ hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator)
assertMinMaxZeroesSign(fstats, []float32{f32zero, f32zero})
assertMinMaxZeroesSign(dstats, []float64{f64zero, f64zero})
+ assertMinMaxZeroesSign(hstats,
[]parquet.FixedLenByteArray{f16PosZero, f16PosZero})
}
}
diff --git a/go/parquet/metadata/statistics_types.gen.go
b/go/parquet/metadata/statistics_types.gen.go
index e8fb9877c8..baecd185d1 100644
--- a/go/parquet/metadata/statistics_types.gen.go
+++ b/go/parquet/metadata/statistics_types.gen.go
@@ -24,6 +24,7 @@ import (
"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/arrow/memory"
"github.com/apache/arrow/go/v15/internal/bitutils"
shared_utils "github.com/apache/arrow/go/v15/internal/utils"
@@ -2432,6 +2433,314 @@ func (s *FixedLenByteArrayStatistics) Encode() (enc
EncodedStatistics, err error
return
}
+type minmaxPairFloat16 [2]parquet.FixedLenByteArray
+
+// Float16Statistics is the typed interface for managing stats for a column
+// of Float16 type.
+type Float16Statistics struct {
+ statistics
+ min parquet.FixedLenByteArray
+ max parquet.FixedLenByteArray
+
+ bitSetReader bitutils.SetBitRunReader
+}
+
+// NewFloat16Statistics constructs an appropriate stat object type using the
+// given column descriptor and allocator.
+//
+// Panics if the physical type of descr is not parquet.Type.FixedLenByteArray
+// Panics if the logical type of descr is not schema.Float16LogicalType
+func NewFloat16Statistics(descr *schema.Column, mem memory.Allocator)
*Float16Statistics {
+ if descr.PhysicalType() != parquet.Types.FixedLenByteArray {
+ panic(fmt.Errorf("parquet: invalid type %s for constructing a
Float16 stat object", descr.PhysicalType()))
+ }
+ if !descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ panic(fmt.Errorf("parquet: invalid logical type %s for
constructing a Float16 stat object", descr.LogicalType().String()))
+ }
+
+ return &Float16Statistics{
+ statistics: statistics{
+ descr: descr,
+ hasNullCount: true,
+ hasDistinctCount: true,
+ order: descr.SortOrder(),
+ encoder:
encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false,
descr, mem),
+ mem: mem,
+ },
+ }
+}
+
+// NewFloat16StatisticsFromEncoded will construct a propertly typed statistics
object
+// initializing it with the provided information.
+func NewFloat16StatisticsFromEncoded(descr *schema.Column, mem
memory.Allocator, nvalues int64, encoded StatProvider) *Float16Statistics {
+ ret := NewFloat16Statistics(descr, mem)
+ ret.nvalues += nvalues
+ if encoded.IsSetNullCount() {
+ ret.IncNulls(encoded.GetNullCount())
+ }
+ if encoded.IsSetDistinctCount() {
+ ret.IncDistinct(encoded.GetDistinctCount())
+ }
+
+ encodedMin := encoded.GetMin()
+ if encodedMin != nil && len(encodedMin) > 0 {
+ ret.min = ret.plainDecode(encodedMin)
+ }
+ encodedMax := encoded.GetMax()
+ if encodedMax != nil && len(encodedMax) > 0 {
+ ret.max = ret.plainDecode(encodedMax)
+ }
+ ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin()
+ return ret
+}
+
+func (s *Float16Statistics) plainEncode(src parquet.FixedLenByteArray) []byte {
+
s.encoder.(encoding.FixedLenByteArrayEncoder).Put([]parquet.FixedLenByteArray{src})
+ buf, err := s.encoder.FlushValues()
+ if err != nil {
+ panic(err) // recovered by Encode
+ }
+ defer buf.Release()
+
+ out := make([]byte, buf.Len())
+ copy(out, buf.Bytes())
+ return out
+}
+
+func (s *Float16Statistics) plainDecode(src []byte) parquet.FixedLenByteArray {
+ var buf [1]parquet.FixedLenByteArray
+
+ decoder := encoding.NewDecoder(s.descr.PhysicalType(),
parquet.Encodings.Plain, s.descr, s.mem)
+ decoder.SetData(1, src)
+ decoder.(encoding.FixedLenByteArrayDecoder).Decode(buf[:])
+ return buf[0]
+}
+
+func (s *Float16Statistics) minval(a, b parquet.FixedLenByteArray)
parquet.FixedLenByteArray {
+ switch {
+ case a == nil:
+ return b
+ case b == nil:
+ return a
+ case s.less(a, b):
+ return a
+ default:
+ return b
+ }
+}
+
+func (s *Float16Statistics) maxval(a, b parquet.FixedLenByteArray)
parquet.FixedLenByteArray {
+ switch {
+ case a == nil:
+ return b
+ case b == nil:
+ return a
+ case s.less(a, b):
+ return b
+ default:
+ return a
+ }
+}
+
+// MinMaxEqual returns true if both stat objects have the same Min and Max
values
+func (s *Float16Statistics) MinMaxEqual(rhs *Float16Statistics) bool {
+ return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max)
+}
+
+// Equals returns true only if both objects are the same type, have the same
min and
+// max values, null count, distinct count and number of values.
+func (s *Float16Statistics) Equals(other TypedStatistics) bool {
+ if s.Type() != other.Type() ||
!s.descr.LogicalType().Equals(other.Descr().LogicalType()) {
+ return false
+ }
+ rhs, ok := other.(*Float16Statistics)
+ if !ok {
+ return false
+ }
+
+ if s.HasMinMax() != rhs.HasMinMax() {
+ return false
+ }
+ return (s.hasMinMax && s.MinMaxEqual(rhs)) &&
+ s.NullCount() == rhs.NullCount() &&
+ s.DistinctCount() == rhs.DistinctCount() &&
+ s.NumValues() == rhs.NumValues()
+}
+
+func (s *Float16Statistics) coalesce(val, fallback parquet.FixedLenByteArray)
parquet.FixedLenByteArray {
+ if float16.FromLEBytes(val).IsNaN() {
+ return fallback
+ }
+ return val
+}
+
+func (s *Float16Statistics) getMinMax(values []parquet.FixedLenByteArray)
(min, max parquet.FixedLenByteArray) {
+ defMin := s.defaultMin()
+ defMax := s.defaultMax()
+
+ min = defMin
+ max = defMax
+
+ for _, v := range values {
+ min = s.minval(min, s.coalesce(v, defMin))
+ max = s.maxval(max, s.coalesce(v, defMax))
+ }
+ return
+}
+
+func (s *Float16Statistics) getMinMaxSpaced(values
[]parquet.FixedLenByteArray, validBits []byte, validBitsOffset int64) (min, max
parquet.FixedLenByteArray) {
+ min = s.defaultMin()
+ max = s.defaultMax()
+
+ if s.bitSetReader == nil {
+ s.bitSetReader = bitutils.NewSetBitRunReader(validBits,
validBitsOffset, int64(len(values)))
+ } else {
+ s.bitSetReader.Reset(validBits, validBitsOffset,
int64(len(values)))
+ }
+
+ for {
+ run := s.bitSetReader.NextRun()
+ if run.Length == 0 {
+ break
+ }
+ for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] {
+ min = s.minval(min, coalesce(v,
s.defaultMin()).(parquet.FixedLenByteArray))
+ max = s.maxval(max, coalesce(v,
s.defaultMax()).(parquet.FixedLenByteArray))
+ }
+ }
+ return
+}
+
+func (s *Float16Statistics) Min() parquet.FixedLenByteArray { return s.min }
+func (s *Float16Statistics) Max() parquet.FixedLenByteArray { return s.max }
+
+// Merge merges the stats from other into this stat object, updating
+// the null count, distinct count, number of values and the min/max if
+// appropriate.
+func (s *Float16Statistics) Merge(other TypedStatistics) {
+ rhs, ok := other.(*Float16Statistics)
+ if !ok {
+ panic("incompatible stat type merge")
+ }
+
+ s.statistics.merge(rhs)
+ if rhs.HasMinMax() {
+ s.SetMinMax(rhs.Min(), rhs.Max())
+ }
+}
+
+// Update is used to add more values to the current stat object, finding the
+// min and max values etc.
+func (s *Float16Statistics) Update(values []parquet.FixedLenByteArray, numNull
int64) {
+ s.IncNulls(numNull)
+ s.nvalues += int64(len(values))
+
+ if len(values) == 0 {
+ return
+ }
+
+ s.SetMinMax(s.getMinMax(values))
+}
+
+// UpdateSpaced is just like Update, but for spaced values using validBits to
determine
+// and skip null values.
+func (s *Float16Statistics) UpdateSpaced(values []parquet.FixedLenByteArray,
validBits []byte, validBitsOffset, numNull int64) {
+ s.IncNulls(numNull)
+ notnull := int64(len(values)) - numNull
+ s.nvalues += notnull
+
+ if notnull == 0 {
+ return
+ }
+
+ s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset))
+}
+
+func (s *Float16Statistics) UpdateFromArrow(values arrow.Array, updateCounts
bool) error {
+ if updateCounts {
+ s.IncNulls(int64(values.NullN()))
+ s.nvalues += int64(values.Len() - values.NullN())
+ }
+
+ if values.NullN() == values.Len() {
+ return nil
+ }
+
+ return fmt.Errorf("%w: update float16 stats from Arrow",
arrow.ErrNotImplemented)
+}
+
+// SetMinMax updates the min and max values only if they are not currently set
+// or if argMin is less than the current min / argMax is greater than the
current max
+func (s *Float16Statistics) SetMinMax(argMin, argMax
parquet.FixedLenByteArray) {
+ maybeMinMax := s.cleanStat([2]parquet.FixedLenByteArray{argMin, argMax})
+ if maybeMinMax == nil {
+ return
+ }
+
+ min := (*maybeMinMax)[0]
+ max := (*maybeMinMax)[1]
+
+ if !s.hasMinMax {
+ s.hasMinMax = true
+ s.min = min
+ s.max = max
+ } else {
+ if !s.less(s.min, min) {
+ s.min = min
+ }
+ if s.less(s.max, max) {
+ s.max = max
+ }
+ }
+}
+
+// EncodeMin returns the encoded min value with plain encoding.
+//
+// ByteArray stats do not include the length in the encoding.
+func (s *Float16Statistics) EncodeMin() []byte {
+ if s.HasMinMax() {
+ return s.plainEncode(s.min)
+ }
+ return nil
+}
+
+// EncodeMax returns the current encoded max value with plain encoding
+//
+// ByteArray stats do not include the length in the encoding
+func (s *Float16Statistics) EncodeMax() []byte {
+ if s.HasMinMax() {
+ return s.plainEncode(s.max)
+ }
+ return nil
+}
+
+// Encode returns a populated EncodedStatistics object
+func (s *Float16Statistics) Encode() (enc EncodedStatistics, err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ switch r := r.(type) {
+ case error:
+ err = r
+ case string:
+ err = xerrors.New(r)
+ default:
+ err = fmt.Errorf("unknown error type thrown
from panic: %v", r)
+ }
+ }
+ }()
+ if s.HasMinMax() {
+ enc.SetMax(s.EncodeMax())
+ enc.SetMin(s.EncodeMin())
+ }
+ if s.HasNullCount() {
+ enc.SetNullCount(s.NullCount())
+ }
+ if s.HasDistinctCount() {
+ enc.SetDistinctCount(s.DistinctCount())
+ }
+ return
+}
+
// NewStatistics uses the type in the column descriptor to construct the
appropriate
// typed stats object. If mem is nil, then memory.DefaultAllocator will be
used.
func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics
{
@@ -2454,6 +2763,9 @@ func NewStatistics(descr *schema.Column, mem
memory.Allocator) TypedStatistics {
case parquet.Types.ByteArray:
return NewByteArrayStatistics(descr, mem)
case parquet.Types.FixedLenByteArray:
+ if descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ return NewFloat16Statistics(descr, mem)
+ }
return NewFixedLenByteArrayStatistics(descr, mem)
default:
panic("not implemented")
@@ -2484,6 +2796,9 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem
memory.Allocator, nvalue
case parquet.Types.ByteArray:
return NewByteArrayStatisticsFromEncoded(descr, mem, nvalues,
encoded)
case parquet.Types.FixedLenByteArray:
+ if descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ return NewFloat16StatisticsFromEncoded(descr, mem,
nvalues, encoded)
+ }
return NewFixedLenByteArrayStatisticsFromEncoded(descr, mem,
nvalues, encoded)
default:
panic("not implemented")
diff --git a/go/parquet/metadata/statistics_types.gen.go.tmpl
b/go/parquet/metadata/statistics_types.gen.go.tmpl
index 4b6253a857..93495527c7 100644
--- a/go/parquet/metadata/statistics_types.gen.go.tmpl
+++ b/go/parquet/metadata/statistics_types.gen.go.tmpl
@@ -45,10 +45,18 @@ type {{.Name}}Statistics struct {
// given column descriptor and allocator.
//
// Panics if the physical type of descr is not parquet.Type.{{if
.physical}}{{.physical}}{{else}}{{.Name}}{{end}}
+{{- if eq .Name "Float16"}}
+// Panics if the logical type of descr is not schema.Float16LogicalType
+{{- end}}
func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator)
*{{.Name}}Statistics {
if descr.PhysicalType() != parquet.Types.{{if
.physical}}{{.physical}}{{else}}{{.Name}}{{end}} {
panic(fmt.Errorf("parquet: invalid type %s for constructing a {{.Name}}
stat object", descr.PhysicalType()))
}
+{{- if eq .Name "Float16"}}
+ if !descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ panic(fmt.Errorf("parquet: invalid logical type %s for constructing a
{{.Name}} stat object", descr.LogicalType().String()))
+ }
+{{- end}}
return &{{.Name}}Statistics{
statistics: statistics{
@@ -96,7 +104,7 @@ func (s *{{.Name}}Statistics) plainEncode(src {{.name}})
[]byte {
copy(out, src)
return out
{{- else}}
- s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src})
+ s.encoder.(encoding.{{if
.logical}}{{.physical}}{{else}}{{.Name}}{{end}}Encoder).Put([]{{.name}}{src})
buf, err := s.encoder.FlushValues()
if err != nil {
panic(err) // recovered by Encode
@@ -117,12 +125,12 @@ func (s *{{.Name}}Statistics) plainDecode(src []byte)
{{.name}} {
decoder := encoding.NewDecoder(s.descr.PhysicalType(),
parquet.Encodings.Plain, s.descr, s.mem)
decoder.SetData(1, src)
- decoder.(encoding.{{.Name}}Decoder).Decode(buf[:])
+ decoder.(encoding.{{if
.logical}}{{.physical}}{{else}}{{.Name}}{{end}}Decoder).Decode(buf[:])
return buf[0]
{{- end}}
}
-{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}}
+{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") (ne .Name
"Float16")}}
func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} {
if s.less(a, b) {
return a
@@ -172,7 +180,11 @@ func (s *{{.Name}}Statistics) MinMaxEqual(rhs
*{{.Name}}Statistics) bool {
// Equals returns true only if both objects are the same type, have the same
min and
// max values, null count, distinct count and number of values.
func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool {
+{{- if .logical}}
+ if s.Type() != other.Type() ||
!s.descr.LogicalType().Equals(other.Descr().LogicalType()) {
+{{- else}}
if s.Type() != other.Type() {
+{{- end}}
return false
}
rhs, ok := other.(*{{.Name}}Statistics)
@@ -194,6 +206,13 @@ func (s *{{.Name}}Statistics) coalesce(val, fallback
{{.name}}) {{.name}} {
}
return val
}
+{{else if eq .Name "Float16"}}
+func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} {
+ if float16.FromLEBytes(val).IsNaN() {
+ return fallback
+ }
+ return val
+}
{{end}}
func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max
{{.name}}) {
@@ -212,7 +231,7 @@ func (s *{{.Name}}Statistics) getMinMax(values []{{.name}})
(min, max {{.name}})
max = defMax
for _, v := range values {
-{{- if or (eq .name "float32") (eq .name "float64") }}
+{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }}
min = s.minval(min, s.coalesce(v, defMin))
max = s.maxval(max, s.coalesce(v, defMax))
{{- else}}
@@ -261,7 +280,7 @@ func (s *{{.Name}}Statistics) getMinMaxSpaced(values
[]{{.name}}, validBits []by
}
{{- else}}
for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] {
-{{- if or (eq .name "float32") (eq .name "float64") }}
+{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }}
min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}}))
max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}}))
{{- else}}
@@ -381,7 +400,9 @@ func (s *{{.Name}}Statistics) UpdateFromArrow(values
arrow.Array, updateCounts b
s.SetMinMax(min, max)
return nil
{{else if eq .Name "Boolean"}}
- return fmt.Errorf("%w: update boolean stats from Arrow",
arrow.ErrNotImplemented)
+ return fmt.Errorf("%w: update boolean stats from Arrow",
arrow.ErrNotImplemented)
+{{else if eq .Name "Float16"}}
+ return fmt.Errorf("%w: update float16 stats from Arrow",
arrow.ErrNotImplemented)
{{else}}
if values.DataType().(arrow.FixedWidthDataType).Bytes() !=
arrow.{{.Name}}SizeBytes {
return fmt.Errorf("%w: cannot update {{.name}} stats with %s arrow array",
@@ -475,8 +496,15 @@ func NewStatistics(descr *schema.Column, mem
memory.Allocator) TypedStatistics {
}
switch descr.PhysicalType() {
{{- range .In}}
+ {{- if not .logical}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
+ {{- if eq .Name "FixedLenByteArray"}}
+ if descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ return NewFloat16Statistics(descr, mem)
+ }
+ {{- end}}
return New{{.Name}}Statistics(descr, mem)
+ {{- end}}
{{- end}}
default:
panic("not implemented")
@@ -493,8 +521,15 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem
memory.Allocator, nvalue
}
switch descr.PhysicalType() {
{{- range .In}}
+ {{- if not .logical}}
case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}:
+ {{- if eq .Name "FixedLenByteArray"}}
+ if descr.LogicalType().Equals(schema.Float16LogicalType{}) {
+ return NewFloat16StatisticsFromEncoded(descr, mem, nvalues, encoded)
+ }
+ {{- end}}
return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded)
+ {{- end}}
{{- end}}
default:
panic("not implemented")
diff --git a/go/parquet/metadata/statistics_types.tmpldata
b/go/parquet/metadata/statistics_types.tmpldata
new file mode 100644
index 0000000000..400c0a3ca5
--- /dev/null
+++ b/go/parquet/metadata/statistics_types.tmpldata
@@ -0,0 +1,60 @@
+[
+ {
+ "Name": "Int32",
+ "name": "int32",
+ "lower": "int32",
+ "prefix": "arrow"
+ },
+ {
+ "Name": "Int64",
+ "name": "int64",
+ "lower": "int64",
+ "prefix": "arrow"
+ },
+ {
+ "Name": "Int96",
+ "name": "parquet.Int96",
+ "lower": "int96",
+ "prefix": "parquet"
+ },
+ {
+ "Name": "Float32",
+ "name": "float32",
+ "lower": "float32",
+ "prefix": "arrow",
+ "physical": "Float"
+ },
+ {
+ "Name": "Float64",
+ "name": "float64",
+ "lower": "float64",
+ "prefix": "arrow",
+ "physical": "Double"
+ },
+ {
+ "Name": "Boolean",
+ "name": "bool",
+ "lower": "bool",
+ "prefix": "arrow"
+ },
+ {
+ "Name": "ByteArray",
+ "name": "parquet.ByteArray",
+ "lower": "byteArray",
+ "prefix": "parquet"
+ },
+ {
+ "Name": "FixedLenByteArray",
+ "name": "parquet.FixedLenByteArray",
+ "lower": "fixedLenByteArray",
+ "prefix": "parquet"
+ },
+ {
+ "Name": "Float16",
+ "name": "parquet.FixedLenByteArray",
+ "lower": "float16",
+ "prefix": "parquet",
+ "physical": "FixedLenByteArray",
+ "logical": "Float16LogicalType"
+ }
+]
diff --git a/go/parquet/pqarrow/column_readers.go
b/go/parquet/pqarrow/column_readers.go
index 02f94c941c..49f3fac0a3 100644
--- a/go/parquet/pqarrow/column_readers.go
+++ b/go/parquet/pqarrow/column_readers.go
@@ -517,6 +517,14 @@ func transferColumnData(rdr file.RecordReader, valueType
arrow.DataType, descr *
default:
return nil, errors.New("time unit not supported")
}
+ case arrow.FLOAT16:
+ if descr.PhysicalType() != parquet.Types.FixedLenByteArray {
+ return nil, errors.New("physical type for float16 must
be fixed len byte array")
+ }
+ if len := arrow.Float16SizeBytes; descr.TypeLength() != len {
+ return nil, fmt.Errorf("fixed len byte array length for
float16 must be %d", len)
+ }
+ return transferBinary(rdr, valueType), nil
default:
return nil, fmt.Errorf("no support for reading columns of type:
%s", valueType.Name())
}
@@ -563,6 +571,14 @@ func transferBinary(rdr file.RecordReader, dt
arrow.DataType) *arrow.Chunked {
chunks[idx] = array.MakeFromData(chunk.Data())
chunk.Release()
}
+ case *arrow.Float16Type:
+ for idx, chunk := range chunks {
+ data := chunk.Data()
+ f16_data := array.NewData(dt, data.Len(),
data.Buffers(), nil, data.NullN(), data.Offset())
+ defer f16_data.Release()
+ chunks[idx] = array.NewFloat16Data(f16_data)
+ chunk.Release()
+ }
}
return arrow.NewChunked(dt, chunks)
}
diff --git a/go/parquet/pqarrow/encode_arrow.go
b/go/parquet/pqarrow/encode_arrow.go
index 81b4527b10..1855d3625a 100644
--- a/go/parquet/pqarrow/encode_arrow.go
+++ b/go/parquet/pqarrow/encode_arrow.go
@@ -582,6 +582,31 @@ func writeDenseArrow(ctx *arrowWriteContext, cw
file.ColumnChunkWriter, leafArr
}
wr.WriteBatchSpaced(data, defLevels, repLevels,
arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
+ case *arrow.Float16Type:
+ typeLen := wr.Descr().TypeLength()
+ if typeLen != arrow.Float16SizeBytes {
+ return fmt.Errorf("%w: invalid
FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid,
typeLen)
+ }
+
+ arr := leafArr.(*array.Float16)
+ rawValues :=
arrow.Float16Traits.CastToBytes(arr.Values())
+ data := make([]parquet.FixedLenByteArray, arr.Len())
+
+ if arr.NullN() == 0 {
+ for idx := range data {
+ offset := idx * typeLen
+ data[idx] = rawValues[offset :
offset+typeLen]
+ }
+ _, err = wr.WriteBatch(data, defLevels,
repLevels)
+ } else {
+ for idx := range data {
+ if arr.IsValid(idx) {
+ offset := idx * typeLen
+ data[idx] = rawValues[offset :
offset+typeLen]
+ }
+ }
+ wr.WriteBatchSpaced(data, defLevels, repLevels,
arr.NullBitmapBytes(), int64(arr.Data().Offset()))
+ }
default:
return fmt.Errorf("%w: invalid column type to write to
FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index 281ca0d526..d588aff701 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -495,6 +495,8 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType {
return schema.DateLogicalType{}
case arrow.DATE64:
return schema.DateLogicalType{}
+ case arrow.FLOAT16:
+ return schema.Float16LogicalType{}
case arrow.TIMESTAMP:
ts := typ.(*arrow.TimestampType)
adjustedUTC := len(ts.TimeZone) == 0
@@ -541,6 +543,8 @@ func getPhysicalType(typ arrow.DataType) parquet.Type {
return parquet.Types.Float
case arrow.FLOAT64:
return parquet.Types.Double
+ case arrow.FLOAT16:
+ return parquet.Types.FixedLenByteArray
case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING:
return parquet.Types.ByteArray
case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL:
@@ -600,6 +604,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ
arrow.DataType, rep parquet.R
byteWidth = int32(typ.ByteWidth)
case arrow.DecimalType:
byteWidth = pqarrow.DecimalSize(typ.GetPrecision())
+ case *arrow.Float16Type:
+ byteWidth = int32(typ.Bytes())
case *arrow.DictionaryType:
valuesType := typ.ValueType
switch dt := valuesType.(type) {
@@ -607,6 +613,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ
arrow.DataType, rep parquet.R
byteWidth = int32(dt.ByteWidth)
case arrow.DecimalType:
byteWidth = pqarrow.DecimalSize(dt.GetPrecision())
+ case *arrow.Float16Type:
+ byteWidth = int32(typ.Bytes())
}
}
@@ -1113,6 +1121,7 @@ var fullTypeList = []arrow.DataType{
arrow.FixedWidthTypes.Date32,
arrow.PrimitiveTypes.Float32,
arrow.PrimitiveTypes.Float64,
+ arrow.FixedWidthTypes.Float16,
arrow.BinaryTypes.String,
arrow.BinaryTypes.Binary,
&arrow.FixedSizeBinaryType{ByteWidth: 10},
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index b23c37ea39..95c477c78b 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -344,6 +344,10 @@ func fieldToNode(name string, field arrow.Field, props
*parquet.WriterProperties
} else {
logicalType = schema.NewTimeLogicalType(true,
schema.TimeUnitMicros)
}
+ case arrow.FLOAT16:
+ typ = parquet.Types.FixedLenByteArray
+ length = arrow.Float16SizeBytes
+ logicalType = schema.Float16LogicalType{}
case arrow.STRUCT:
return structToNode(field.Type.(*arrow.StructType), field.Name,
field.Nullable, props, arrprops)
case arrow.FIXED_SIZE_LIST, arrow.LIST:
@@ -597,6 +601,8 @@ func arrowFromFLBA(logical schema.LogicalType, length int)
(arrow.DataType, erro
return arrowDecimal(logtype), nil
case schema.NoLogicalType, schema.IntervalLogicalType,
schema.UUIDLogicalType:
return &arrow.FixedSizeBinaryType{ByteWidth: int(length)}, nil
+ case schema.Float16LogicalType:
+ return &arrow.Float16Type{}, nil
default:
return nil, xerrors.New("unhandled logical type " +
logical.String() + " for fixed-length byte array")
}
diff --git a/go/parquet/pqarrow/schema_test.go
b/go/parquet/pqarrow/schema_test.go
index b5e7dc8fad..ee5aad8913 100644
--- a/go/parquet/pqarrow/schema_test.go
+++ b/go/parquet/pqarrow/schema_test.go
@@ -280,6 +280,25 @@ func TestConvertArrowDecimals(t *testing.T) {
}
}
+func TestConvertArrowFloat16(t *testing.T) {
+ parquetFields := make(schema.FieldList, 0)
+ arrowFields := make([]arrow.Field, 0)
+
+ parquetFields = append(parquetFields,
schema.Must(schema.NewPrimitiveNodeLogical("float16",
parquet.Repetitions.Required,
+ schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray,
2, -1)))
+ arrowFields = append(arrowFields, arrow.Field{Name: "float16", Type:
&arrow.Float16Type{}})
+
+ arrowSchema := arrow.NewSchema(arrowFields, nil)
+ parquetSchema :=
schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema",
parquet.Repetitions.Repeated, parquetFields, -1)))
+
+ result, err := pqarrow.ToParquet(arrowSchema, nil,
pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)))
+ assert.NoError(t, err)
+ assert.True(t, parquetSchema.Equals(result))
+ for i := 0; i < parquetSchema.NumColumns(); i++ {
+ assert.Truef(t,
parquetSchema.Column(i).Equals(result.Column(i)), "Column %d didn't match: %s",
i, parquetSchema.Column(i).Name())
+ }
+}
+
func TestCoerceTImestampV1(t *testing.T) {
parquetFields := make(schema.FieldList, 0)
arrowFields := make([]arrow.Field, 0)
@@ -418,7 +437,6 @@ func TestUnsupportedTypes(t *testing.T) {
typ arrow.DataType
}{
// Non-exhaustive list of unsupported types
- {typ: &arrow.Float16Type{}},
{typ: &arrow.DurationType{}},
{typ: &arrow.DayTimeIntervalType{}},
{typ: &arrow.MonthIntervalType{}},
diff --git a/go/parquet/schema/logical_types.go
b/go/parquet/schema/logical_types.go
index 1ea44fc56c..69e6936388 100644
--- a/go/parquet/schema/logical_types.go
+++ b/go/parquet/schema/logical_types.go
@@ -68,6 +68,8 @@ func getLogicalType(l *format.LogicalType) LogicalType {
return BSONLogicalType{}
case l.IsSetUUID():
return UUIDLogicalType{}
+ case l.IsSetFLOAT16():
+ return Float16LogicalType{}
case l == nil:
return NoLogicalType{}
default:
@@ -1064,6 +1066,50 @@ func (IntervalLogicalType) Equals(rhs LogicalType) bool {
return ok
}
+// Float16LogicalType can only be used with a FixedLength byte array column
+// that is exactly 2 bytes long
+type Float16LogicalType struct{ baseLogicalType }
+
+func (Float16LogicalType) SortOrder() SortOrder {
+ return SortSIGNED
+}
+
+func (Float16LogicalType) MarshalJSON() ([]byte, error) {
+ return json.Marshal(map[string]string{"Type":
Float16LogicalType{}.String()})
+}
+
+func (Float16LogicalType) String() string {
+ return "Float16"
+}
+
+func (Float16LogicalType) ToConvertedType() (ConvertedType, DecimalMetadata) {
+ return ConvertedTypes.None, DecimalMetadata{}
+}
+
+func (Float16LogicalType) IsCompatible(c ConvertedType, dec DecimalMetadata)
bool {
+ if dec.IsSet {
+ return false
+ }
+ switch c {
+ case ConvertedTypes.None, ConvertedTypes.NA:
+ return true
+ }
+ return false
+}
+
+func (Float16LogicalType) IsApplicable(t parquet.Type, tlen int32) bool {
+ return t == parquet.Types.FixedLenByteArray && tlen == 2
+}
+
+func (Float16LogicalType) toThrift() *format.LogicalType {
+ return &format.LogicalType{FLOAT16: format.NewFloat16Type()}
+}
+
+func (Float16LogicalType) Equals(rhs LogicalType) bool {
+ _, ok := rhs.(Float16LogicalType)
+ return ok
+}
+
type NullLogicalType struct{ baseLogicalType }
func (NullLogicalType) SortOrder() SortOrder {
diff --git a/go/parquet/schema/logical_types_test.go
b/go/parquet/schema/logical_types_test.go
index 49edf1748c..c371b47714 100644
--- a/go/parquet/schema/logical_types_test.go
+++ b/go/parquet/schema/logical_types_test.go
@@ -158,6 +158,7 @@ func TestNewTypeIncompatibility(t *testing.T) {
expected schema.LogicalType
}{
{"uuid", schema.UUIDLogicalType{}, schema.UUIDLogicalType{}},
+ {"float16", schema.Float16LogicalType{},
schema.Float16LogicalType{}},
{"null", schema.NullLogicalType{}, schema.NullLogicalType{}},
{"not-utc-time_milli", schema.NewTimeLogicalType(false /*
adjutedToUTC */, schema.TimeUnitMillis), &schema.TimeLogicalType{}},
{"not-utc-time-micro", schema.NewTimeLogicalType(false /*
adjutedToUTC */, schema.TimeUnitMicros), &schema.TimeLogicalType{}},
@@ -224,6 +225,7 @@ func TestLogicalTypeProperties(t *testing.T) {
{"json", schema.JSONLogicalType{}, false, true, true},
{"bson", schema.BSONLogicalType{}, false, true, true},
{"uuid", schema.UUIDLogicalType{}, false, true, true},
+ {"float16", schema.Float16LogicalType{}, false, true, true},
{"nological", schema.NoLogicalType{}, false, false, true},
{"unknown", schema.UnknownLogicalType{}, false, false, false},
}
@@ -358,6 +360,14 @@ func TestLogicalInapplicableTypes(t *testing.T) {
assert.False(t, logical.IsApplicable(tt.typ, tt.len))
})
}
+
+ logical = schema.Float16LogicalType{}
+ assert.True(t, logical.IsApplicable(parquet.Types.FixedLenByteArray, 2))
+ for _, tt := range tests {
+ t.Run("float16 "+tt.name, func(t *testing.T) {
+ assert.False(t, logical.IsApplicable(tt.typ, tt.len))
+ })
+ }
}
func TestDecimalLogicalTypeApplicability(t *testing.T) {
@@ -445,6 +455,7 @@ func TestLogicalTypeRepresentation(t *testing.T) {
{"json", schema.JSONLogicalType{}, "JSON", `{"Type": "JSON"}`},
{"bson", schema.BSONLogicalType{}, "BSON", `{"Type": "BSON"}`},
{"uuid", schema.UUIDLogicalType{}, "UUID", `{"Type": "UUID"}`},
+ {"float16", schema.Float16LogicalType{}, "Float16", `{"Type":
"Float16"}`},
{"none", schema.NoLogicalType{}, "None", `{"Type": "None"}`},
}
@@ -490,6 +501,7 @@ func TestLogicalTypeSortOrder(t *testing.T) {
{"json", schema.JSONLogicalType{}, schema.SortUNSIGNED},
{"bson", schema.BSONLogicalType{}, schema.SortUNSIGNED},
{"uuid", schema.UUIDLogicalType{}, schema.SortUNSIGNED},
+ {"float16", schema.Float16LogicalType{}, schema.SortSIGNED},
{"none", schema.NoLogicalType{}, schema.SortUNKNOWN},
}
diff --git a/go/parquet/schema/reflection.go b/go/parquet/schema/reflection.go
index d79edb9240..c0c8e0533e 100644
--- a/go/parquet/schema/reflection.go
+++ b/go/parquet/schema/reflection.go
@@ -22,6 +22,7 @@ import (
"strconv"
"strings"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/parquet"
format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet"
"golang.org/x/xerrors"
@@ -159,6 +160,8 @@ func (t *taggedInfo) UpdateLogicalTypes() {
return BSONLogicalType{}
case "uuid":
return UUIDLogicalType{}
+ case "float16":
+ return Float16LogicalType{}
default:
panic(fmt.Errorf("invalid logical type specified: %s",
t))
}
@@ -373,6 +376,9 @@ func typeToNode(name string, typ reflect.Type, repType
parquet.Repetition, info
}
return Must(MapOf(name, key, value, repType, fieldID))
case reflect.Struct:
+ if typ == reflect.TypeOf(float16.Num{}) {
+ return MustPrimitive(NewPrimitiveNodeLogical(name,
repType, Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID))
+ }
// structs are Group nodes
fields := make(FieldList, 0)
for i := 0; i < typ.NumField(); i++ {
diff --git a/go/parquet/schema/reflection_test.go
b/go/parquet/schema/reflection_test.go
index 06ad7191a5..e3a880cacc 100644
--- a/go/parquet/schema/reflection_test.go
+++ b/go/parquet/schema/reflection_test.go
@@ -22,6 +22,7 @@ import (
"reflect"
"testing"
+ "github.com/apache/arrow/go/v15/arrow/float16"
"github.com/apache/arrow/go/v15/parquet"
"github.com/apache/arrow/go/v15/parquet/schema"
"github.com/stretchr/testify/assert"
@@ -152,6 +153,9 @@ func ExampleNewSchemaFromStruct_logicaltypes() {
JSON string `parquet:"logical=json"`
BSON []byte `parquet:"logical=BSON"`
UUID [16]byte `parquet:"logical=uuid"`
+ Float16 [2]byte `parquet:"logical=float16"`
+ Float16Optional *[2]byte `parquet:"logical=float16"`
+ Float16Num float16.Num
}
sc, err := schema.NewSchemaFromStruct(LogicalTypes{})
@@ -180,6 +184,9 @@ func ExampleNewSchemaFromStruct_logicaltypes() {
// required byte_array field_id=-1 JSON (JSON);
// required byte_array field_id=-1 BSON (BSON);
// required fixed_len_byte_array field_id=-1 UUID (UUID);
+ // required fixed_len_byte_array field_id=-1 Float16 (Float16);
+ // optional fixed_len_byte_array field_id=-1 Float16Optional
(Float16);
+ // required fixed_len_byte_array field_id=-1 Float16Num (Float16);
// }
}
diff --git a/go/parquet/schema/schema_element_test.go
b/go/parquet/schema/schema_element_test.go
index dd1d293e5c..d190ffe5a2 100644
--- a/go/parquet/schema/schema_element_test.go
+++ b/go/parquet/schema/schema_element_test.go
@@ -159,6 +159,10 @@ func (s *SchemaElementConstructionSuite) TestSimple() {
"uuid", UUIDLogicalType{},
parquet.Types.FixedLenByteArray, 16, false, ConvertedTypes.NA, true,
func(e *format.SchemaElement) bool { return
e.LogicalType.IsSetUUID() },
}, nil},
+ {"float16", &schemaElementConstructArgs{
+ "float16", Float16LogicalType{},
parquet.Types.FixedLenByteArray, 2, false, ConvertedTypes.NA, true,
+ func(e *format.SchemaElement) bool { return
e.LogicalType.IsSetFLOAT16() },
+ }, nil},
{"none", &schemaElementConstructArgs{
"none", NoLogicalType{}, parquet.Types.Int64, -1,
false, ConvertedTypes.NA, false,
checkNone,
@@ -425,7 +429,8 @@ func TestSchemaElementNestedSerialization(t *testing.T) {
timestampNode := MustPrimitive(NewPrimitiveNodeLogical("timestamp"
/*name */, parquet.Repetitions.Required, NewTimestampLogicalType(false /*
adjustedToUTC */, TimeUnitNanos), parquet.Types.Int64, -1 /* type len */, -1 /*
fieldID */))
intNode := MustPrimitive(NewPrimitiveNodeLogical("int" /*name */,
parquet.Repetitions.Required, NewIntLogicalType(64 /* bitWidth */, false /*
signed */), parquet.Types.Int64, -1 /* type len */, -1 /* fieldID */))
decimalNode := MustPrimitive(NewPrimitiveNodeLogical("decimal" /*name
*/, parquet.Repetitions.Required, NewDecimalLogicalType(16 /* precision */, 6
/* scale */), parquet.Types.Int64, -1 /* type len */, -1 /* fieldID */))
- listNode := MustGroup(NewGroupNodeLogical("list" /*name */,
parquet.Repetitions.Repeated, []Node{strNode, dateNode, jsonNode, uuidNode,
timestampNode, intNode, decimalNode}, NewListLogicalType(), -1 /* fieldID */))
+ float16Node := MustPrimitive(NewPrimitiveNodeLogical("float16" /*name
*/, parquet.Repetitions.Required, Float16LogicalType{},
parquet.Types.FixedLenByteArray, 2 /* type len */, - /* fieldID */ 1))
+ listNode := MustGroup(NewGroupNodeLogical("list" /*name */,
parquet.Repetitions.Repeated, []Node{strNode, dateNode, jsonNode, uuidNode,
timestampNode, intNode, decimalNode, float16Node}, NewListLogicalType(), -1 /*
fieldID */))
listElems := ToThrift(listNode)
assert.Equal(t, "list", listElems[0].Name)
@@ -440,6 +445,7 @@ func TestSchemaElementNestedSerialization(t *testing.T) {
assert.True(t, listElems[5].LogicalType.IsSetTIMESTAMP())
assert.True(t, listElems[6].LogicalType.IsSetINTEGER())
assert.True(t, listElems[7].LogicalType.IsSetDECIMAL())
+ assert.True(t, listElems[8].LogicalType.IsSetFLOAT16())
mapNode := MustGroup(NewGroupNodeLogical("map" /* name */,
parquet.Repetitions.Required, []Node{}, MapLogicalType{}, -1 /* fieldID */))
mapElems := ToThrift(mapNode)
@@ -486,6 +492,7 @@ func TestLogicalTypeSerializationRoundTrip(t *testing.T) {
{"json", JSONLogicalType{}, parquet.Types.ByteArray, -1},
{"bson", BSONLogicalType{}, parquet.Types.ByteArray, -1},
{"uuid", UUIDLogicalType{}, parquet.Types.FixedLenByteArray,
16},
+ {"float16", Float16LogicalType{},
parquet.Types.FixedLenByteArray, 2},
{"none", NoLogicalType{}, parquet.Types.Boolean, -1},
}
diff --git a/go/parquet/schema/schema_test.go b/go/parquet/schema/schema_test.go
index b60c7dfaaf..cc43c3856d 100644
--- a/go/parquet/schema/schema_test.go
+++ b/go/parquet/schema/schema_test.go
@@ -635,6 +635,10 @@ func TestPanicSchemaNodeCreation(t *testing.T) {
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("uuid" /*
name */, parquet.Repetitions.Required, schema.UUIDLogicalType{},
parquet.Types.FixedLenByteArray, 64 /* type len */, -1 /* fieldID */))
}, "incompatible primitive length")
+ assert.Panics(t, func() {
+ schema.MustPrimitive(schema.NewPrimitiveNodeLogical("float16"
/* name */, parquet.Repetitions.Required, schema.Float16LogicalType{},
parquet.Types.FixedLenByteArray, 4 /* type len */, -1 /* fieldID */))
+ }, "incompatible primitive length")
+
assert.Panics(t, func() {
schema.MustPrimitive(schema.NewPrimitiveNodeLogical("negative_len" /* name */,
parquet.Repetitions.Required, schema.NoLogicalType{},
parquet.Types.FixedLenByteArray, -16 /* type len */, -1 /* fieldID */))
}, "non-positive length for fixed length binary")