zeroshade commented on a change in pull request #10071: URL: https://github.com/apache/arrow/pull/10071#discussion_r619902201
########## File path: go/parquet/schema/converted_types.go ########## @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// ConvertedType corresponds to the ConvertedType in the parquet.Thrift, +// with added values of None and NA for handling when these values are not +// set in the metadata +type ConvertedType format.ConvertedType + +var ( + // ConvertedTypes is a struct containing the constants for the types + // to make it easy to reference them while making it clear what they are + ConvertedTypes = struct { + None ConvertedType + UTF8 ConvertedType + Map ConvertedType + MapKeyValue ConvertedType + List ConvertedType + Enum ConvertedType + Decimal ConvertedType + Date ConvertedType + TimeMillis ConvertedType + TimeMicros ConvertedType + TimestampMillis ConvertedType + TimestampMicros ConvertedType + Uint8 ConvertedType + Uint16 ConvertedType + Uint32 ConvertedType + Uint64 ConvertedType + Int8 ConvertedType + Int16 ConvertedType + Int32 ConvertedType + Int64 ConvertedType + JSON ConvertedType + BSON ConvertedType + Interval ConvertedType + NA ConvertedType + }{ + None: -1, // thrift enum starts at 0, so we know this will not be used + UTF8: ConvertedType(format.ConvertedType_UTF8), + Map: ConvertedType(format.ConvertedType_MAP), + MapKeyValue: ConvertedType(format.ConvertedType_MAP_KEY_VALUE), + List: ConvertedType(format.ConvertedType_LIST), + Enum: ConvertedType(format.ConvertedType_ENUM), + Decimal: ConvertedType(format.ConvertedType_DECIMAL), + Date: ConvertedType(format.ConvertedType_DATE), + TimeMillis: ConvertedType(format.ConvertedType_TIME_MILLIS), + TimeMicros: ConvertedType(format.ConvertedType_TIME_MICROS), + TimestampMillis: ConvertedType(format.ConvertedType_TIMESTAMP_MILLIS), + TimestampMicros: ConvertedType(format.ConvertedType_TIMESTAMP_MICROS), + Uint8: ConvertedType(format.ConvertedType_UINT_8), + Uint16: ConvertedType(format.ConvertedType_UINT_16), + Uint32: ConvertedType(format.ConvertedType_UINT_32), + Uint64: ConvertedType(format.ConvertedType_UINT_64), + Int8: ConvertedType(format.ConvertedType_INT_8), + Int16: ConvertedType(format.ConvertedType_INT_16), + Int32: ConvertedType(format.ConvertedType_INT_32), + Int64: ConvertedType(format.ConvertedType_INT_64), + JSON: ConvertedType(format.ConvertedType_JSON), + BSON: ConvertedType(format.ConvertedType_BSON), + Interval: ConvertedType(format.ConvertedType_INTERVAL), + NA: 24, // should always be the last values after Interval + } +) + +func (p ConvertedType) String() string { + switch p { + case ConvertedTypes.None: + return "NONE" + case ConvertedTypes.NA: + return "UNKNOWN" + default: + return format.ConvertedType(p).String() + } +} + +// ToLogicalType returns the correct LogicalType for the given ConvertedType, using the decimal +// metadata provided to define the precision/scale if necessary +func (p ConvertedType) ToLogicalType(convertedDecimal DecimalMetadata) LogicalType { + switch p { + case ConvertedTypes.UTF8: + return StringLogicalType{} + case ConvertedTypes.Map, ConvertedTypes.MapKeyValue: + return MapLogicalType{} + case ConvertedTypes.List: + return ListLogicalType{} + case ConvertedTypes.Enum: + return EnumLogicalType{} + case ConvertedTypes.Decimal: + return NewDecimalLogicalType(convertedDecimal.Precision, convertedDecimal.Scale) + case ConvertedTypes.Date: + return DateLogicalType{} + case ConvertedTypes.TimeMillis: + return NewTimeLogicalType(true, TimeUnitMillis) + case ConvertedTypes.TimeMicros: + return NewTimeLogicalType(true, TimeUnitMicros) + case ConvertedTypes.TimestampMillis: + t := NewTimestampLogicalType(true, TimeUnitMillis) + t.(*TimestampLogicalType).fromConverted = true + return t + case ConvertedTypes.TimestampMicros: + t := NewTimestampLogicalType(true, TimeUnitMicros) + t.(*TimestampLogicalType).fromConverted = true + return t + case ConvertedTypes.Interval: + return IntervalLogicalType{} + case ConvertedTypes.Int8: + return NewIntLogicalType(8, true) + case ConvertedTypes.Int16: + return NewIntLogicalType(16, true) + case ConvertedTypes.Int32: + return NewIntLogicalType(32, true) + case ConvertedTypes.Int64: + return NewIntLogicalType(64, true) + case ConvertedTypes.Uint8: + return NewIntLogicalType(8, false) + case ConvertedTypes.Uint16: + return NewIntLogicalType(16, false) + case ConvertedTypes.Uint32: + return NewIntLogicalType(32, false) + case ConvertedTypes.Uint64: + return NewIntLogicalType(64, false) + case ConvertedTypes.JSON: + return JSONLogicalType{} + case ConvertedTypes.BSON: + return BSONLogicalType{} + case ConvertedTypes.None: + return NoLogicalType{} + case ConvertedTypes.NA: + fallthrough + default: + return UnknownLogicalType{} + } +} + +// GetSortOrder defaults to the sort order based on the physical type if convert +// is ConvertedTypes.None, otherwise determines the sort order by the converted type. +func GetSortOrder(convert ConvertedType, primitive format.Type) SortOrder { + if convert == ConvertedTypes.None { + return DefaultSortOrder(primitive) + } + switch convert { + case ConvertedTypes.Int8, + ConvertedTypes.Int16, + ConvertedTypes.Int32, + ConvertedTypes.Int64, + ConvertedTypes.Date, + ConvertedTypes.TimeMicros, + ConvertedTypes.TimeMillis, + ConvertedTypes.TimestampMicros, + ConvertedTypes.TimestampMillis: + return SortSIGNED + case ConvertedTypes.Uint8, + ConvertedTypes.Uint16, + ConvertedTypes.Uint32, + ConvertedTypes.Uint64, + ConvertedTypes.Enum, + ConvertedTypes.UTF8, + ConvertedTypes.BSON, + ConvertedTypes.JSON: + return SortUNSIGNED + case ConvertedTypes.Decimal, Review comment: so i definitely copied this from the C++ implementation, i believe it's "sortUNKNOWN" because it can differ based on the underlying type, that said it means my current implementation of statistic sorting for decimals is incorrect now that i read the spec :) so i'll fix that in the rest of my code so it's correct before I put that part of the chunks up haha. That said, given that int32 and int64 as the underling types for the decimal are signed comparison, i think i'll shift this to being marked as SortSIGNED since the spec defines decimal as sort by signed comparison. ########## File path: go/parquet/schema/logical_types.go ########## @@ -0,0 +1,1089 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "encoding/json" + "fmt" + "math" + + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/internal/debug" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// DecimalMetadata is a struct for managing scale and precision information between +// converted and logical types. +type DecimalMetadata struct { + IsSet bool + Scale int32 + Precision int32 +} + +func getLogicalType(l *format.LogicalType) LogicalType { + switch { + case l.IsSetSTRING(): + return StringLogicalType{} + case l.IsSetMAP(): + return MapLogicalType{} + case l.IsSetLIST(): + return ListLogicalType{} + case l.IsSetENUM(): + return EnumLogicalType{} + case l.IsSetDECIMAL(): + return &DecimalLogicalType{typ: l.DECIMAL} + case l.IsSetDATE(): + return DateLogicalType{} + case l.IsSetTIME(): + if timeUnitFromThrift(l.TIME.Unit) == TimeUnitUnknown { + panic("parquet: TimeUnit must be one of MILLIS, MICROS, or NANOS for Time logical type") + } + return &TimeLogicalType{typ: l.TIME} + case l.IsSetTIMESTAMP(): + if timeUnitFromThrift(l.TIMESTAMP.Unit) == TimeUnitUnknown { + panic("parquet: TimeUnit must be one of MILLIS, MICROS, or NANOS for Timestamp logical type") + } + return &TimestampLogicalType{typ: l.TIMESTAMP} + case l.IsSetINTEGER(): + return &IntLogicalType{typ: l.INTEGER} + case l.IsSetUNKNOWN(): + return NullLogicalType{} + case l.IsSetJSON(): + return JSONLogicalType{} + case l.IsSetBSON(): + return BSONLogicalType{} + case l.IsSetUUID(): + return UUIDLogicalType{} + case l == nil: + return NoLogicalType{} + default: + panic("invalid logical type") + } +} + +// TimeUnitType is an enum for denoting whether a time based logical type +// is using milliseconds, microseconds or nanoseconds. +type TimeUnitType int + +// Constants for the TimeUnitType +const ( + TimeUnitMillis TimeUnitType = iota + TimeUnitMicros + TimeUnitNanos + TimeUnitUnknown +) + +// LogicalType is the descriptor that defines the usage of a physical primitive +// type in the schema, such as an Interval, Date, etc. +type LogicalType interface { + // Returns true if a nested type like List or Map + IsNested() bool + // Returns true if this type can be serialized, ie: not Unknown/NoType/Interval Review comment: https://github.com/apache/arrow/blob/master/cpp/src/parquet/parquet.thrift#L340 The current parquet.thrift file has the logical type for INTERVAL commented out so the generated code doesn't contain a LogicalType for Interval and thus it's unable to be serialized. This also comes from the C++ implementation too https://github.com/apache/arrow/blob/master/cpp/src/parquet/schema.cc#L520 ########## File path: go/parquet/schema/reflection_test.go ########## @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema_test + +import ( + "log" + "os" + "reflect" + "testing" + + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/schema" + "github.com/stretchr/testify/assert" +) + +func ExampleNewSchemaFromStruct_primitives() { + type Schema struct { + Bool bool + Int8 int8 + Uint16 uint16 + Int32 int32 + Int64 int64 + Int96 parquet.Int96 + Float float32 + Double float64 + ByteArray string + FixedLenByteArray [10]byte + } + + sc, err := schema.NewSchemaFromStruct(Schema{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 Schema { + // required boolean field_id=-1 Bool; + // required int32 field_id=-1 Int8 (Int(bitWidth=8, isSigned=true)); + // required int32 field_id=-1 Uint16 (Int(bitWidth=16, isSigned=false)); + // required int32 field_id=-1 Int32 (Int(bitWidth=32, isSigned=true)); + // required int64 field_id=-1 Int64 (Int(bitWidth=64, isSigned=true)); + // required int96 field_id=-1 Int96; + // required float field_id=-1 Float; + // required double field_id=-1 Double; + // required byte_array field_id=-1 ByteArray; + // required fixed_len_byte_array field_id=-1 FixedLenByteArray; + // } +} + +func ExampleNewSchemaFromStruct_convertedtypes() { + type ConvertedSchema struct { + Utf8 string `parquet:"name=utf8, converted=UTF8"` + Uint32 uint32 `parquet:"converted=INT_32"` + Date int32 `parquet:"name=date, converted=date"` + TimeMilli int32 `parquet:"name=timemilli, converted=TIME_MILLIS"` + TimeMicro int64 `parquet:"name=timemicro, converted=time_micros"` + TimeStampMilli int64 `parquet:"converted=timestamp_millis"` + TimeStampMicro int64 `parquet:"converted=timestamp_micros"` + Interval parquet.Int96 `parquet:"converted=INTERVAL"` + Decimal1 int32 `parquet:"converted=decimal, scale=2, precision=9"` + Decimal2 int64 `parquet:"converted=decimal, scale=2, precision=18"` + Decimal3 [12]byte `parquet:"converted=decimal, scale=2, precision=10"` + Decimal4 string `parquet:"converted=decimal, scale=2, precision=20"` + } + + sc, err := schema.NewSchemaFromStruct(&ConvertedSchema{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 ConvertedSchema { + // required byte_array field_id=-1 utf8 (String); + // required int32 field_id=-1 Uint32 (Int(bitWidth=32, isSigned=true)); + // required int32 field_id=-1 date (Date); + // required int32 field_id=-1 timemilli (Time(isAdjustedToUTC=true, timeUnit=milliseconds)); + // required int64 field_id=-1 timemicro (Time(isAdjustedToUTC=true, timeUnit=microseconds)); + // required int64 field_id=-1 TimeStampMilli (Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, is_from_converted_type=true, force_set_converted_type=false)); + // required int64 field_id=-1 TimeStampMicro (Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, is_from_converted_type=true, force_set_converted_type=false)); + // required int96 field_id=-1 Interval; + // required int32 field_id=-1 Decimal1 (Decimal(precision=9, scale=2)); + // required int64 field_id=-1 Decimal2 (Decimal(precision=18, scale=2)); + // required fixed_len_byte_array field_id=-1 Decimal3 (Decimal(precision=10, scale=2)); + // required byte_array field_id=-1 Decimal4 (Decimal(precision=20, scale=2)); + // } +} + +func ExampleNewSchemaFromStruct_repetition() { + type RepetitionSchema struct { + List []int64 `parquet:"fieldid=1"` + Repeated []int64 `parquet:"repetition=repeated, fieldid=2"` + Optional *int64 `parquet:"fieldid=3"` + Required *int64 `parquet:"repetition=REQUIRED, fieldid=4"` + Opt int64 `parquet:"repetition=OPTIONAL, fieldid=5"` + } + + sc, err := schema.NewSchemaFromStruct(RepetitionSchema{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 RepetitionSchema { + // required group field_id=1 List (List) { + // repeated group field_id=-1 list { + // required int64 field_id=-1 element (Int(bitWidth=64, isSigned=true)); + // } + // } + // repeated int64 field_id=2 Repeated (Int(bitWidth=64, isSigned=true)); + // optional int64 field_id=3 Optional (Int(bitWidth=64, isSigned=true)); + // required int64 field_id=4 Required (Int(bitWidth=64, isSigned=true)); + // optional int64 field_id=5 Opt (Int(bitWidth=64, isSigned=true)); + // } +} + +func ExampleNewSchemaFromStruct_logicaltypes() { + type LogicalTypes struct { + String []byte `parquet:"logical=String"` + Enum string `parquet:"logical=enum"` + Date int32 `parquet:"logical=date"` + Decimal1 int32 `parquet:"logical=decimal, precision=9, scale=2"` + Decimal2 int32 `parquet:"logical=decimal, logical.precision=9, scale=2"` + Decimal3 int32 `parquet:"logical=decimal, precision=5, logical.precision=9, scale=1, logical.scale=3"` + TimeMilliUTC int32 `parquet:"logical=TIME, logical.unit=millis"` + TimeMilli int32 `parquet:"logical=Time, logical.unit=millis, logical.isadjustedutc=false"` + TimeMicros int64 `parquet:"logical=time, logical.unit=micros, logical.isadjustedutc=false"` + TimeMicrosUTC int64 `parquet:"logical=time, logical.unit=micros, logical.isadjustedutc=true"` + TimeNanos int64 `parquet:"logical=time, logical.unit=nanos"` + TimestampMilli int64 `parquet:"logical=timestamp, logical.unit=millis"` + TimestampMicrosNotUTC int64 `parquet:"logical=timestamp, logical.unit=micros, logical.isadjustedutc=false"` + TimestampNanos int64 `parquet:"logical=timestamp, logical.unit=nanos"` + JSON string `parquet:"logical=json"` + BSON []byte `parquet:"logical=BSON"` + UUID [16]byte `parquet:"logical=uuid"` + } + + sc, err := schema.NewSchemaFromStruct(LogicalTypes{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 LogicalTypes { + // required byte_array field_id=-1 String (String); + // required byte_array field_id=-1 Enum (Enum); + // required int32 field_id=-1 Date (Date); + // required int32 field_id=-1 Decimal1 (Decimal(precision=9, scale=2)); + // required int32 field_id=-1 Decimal2 (Decimal(precision=9, scale=2)); + // required int32 field_id=-1 Decimal3 (Decimal(precision=9, scale=3)); + // required int32 field_id=-1 TimeMilliUTC (Time(isAdjustedToUTC=true, timeUnit=milliseconds)); + // required int32 field_id=-1 TimeMilli (Time(isAdjustedToUTC=false, timeUnit=milliseconds)); + // required int64 field_id=-1 TimeMicros (Time(isAdjustedToUTC=false, timeUnit=microseconds)); + // required int64 field_id=-1 TimeMicrosUTC (Time(isAdjustedToUTC=true, timeUnit=microseconds)); + // required int64 field_id=-1 TimeNanos (Time(isAdjustedToUTC=true, timeUnit=nanoseconds)); + // required int64 field_id=-1 TimestampMilli (Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, is_from_converted_type=false, force_set_converted_type=false)); + // required int64 field_id=-1 TimestampMicrosNotUTC (Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false)); + // required int64 field_id=-1 TimestampNanos (Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, is_from_converted_type=false, force_set_converted_type=false)); + // required byte_array field_id=-1 JSON (JSON); + // required byte_array field_id=-1 BSON (BSON); + // required fixed_len_byte_array field_id=-1 UUID (UUID); + // } +} + +func ExampleNewSchemaFromStruct_physicaltype() { + type ChangeTypes struct { + Int32 int64 `parquet:"type=int32"` + FixedLen string `parquet:"type=fixed_len_byte_array, length=10"` + SliceAsFixed []byte `parquet:"type=fixed_len_byte_array, length=12"` + Int int `parquet:"type=int32"` + } + + sc, err := schema.NewSchemaFromStruct(ChangeTypes{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 ChangeTypes { + // required int32 field_id=-1 Int32 (Int(bitWidth=32, isSigned=true)); + // required fixed_len_byte_array field_id=-1 FixedLen; + // required fixed_len_byte_array field_id=-1 SliceAsFixed; + // required int32 field_id=-1 Int (Int(bitWidth=32, isSigned=true)); + // } +} + +func ExampleNewSchemaFromStruct_nestedtypes() { + type Other struct { + OptionalMap *map[string]*string `parquet:"valuerepetition=required, keylogical=String, valueconverted=BSON"` + } + + type MyMap map[int32]string + + type Nested struct { + SimpleMap map[int32]string + FixedLenMap map[string][]byte `parquet:"keytype=fixed_len_byte_array, keyfieldid=10, valuefieldid=11, keylength=10"` + DecimalMap map[int32]string `parquet:"logical=map, keyconverted=DECIMAL, keyscale=3, keyprecision=7, valuetype=fixed_len_byte_array, valuelength=4, valuelogical=decimal, valuelogical.precision=9, valuescale=2"` + OtherList []*Other + OtherRepeated []Other `parquet:"repetition=repeated"` + DateArray [5]int32 `parquet:"valuelogical=date, logical=list"` + DateMap MyMap `parquet:"keylogical=TIME, keylogical.unit=MILLIS, keylogical.isadjustedutc=false, valuelogical=enum"` + } + + sc, err := schema.NewSchemaFromStruct(Nested{}) + if err != nil { + log.Fatal(err) + } + + schema.PrintSchema(sc.Root(), os.Stdout, 2) + + // Output: + // repeated group field_id=-1 Nested { Review comment: I'm not sure what you mean, what assertion are you referring to? ########## File path: go/parquet/schema/column.go ########## @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "strings" + + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// Column encapsulates the information necessary to interpret primitive +// column data in the context of a particular schema. We have to examine +// the node structure of a column's path to the root in the schema tree +// to be able to reassemble the nested structure from the repetition and +// definition levels. +type Column struct { + pnode *PrimitiveNode + maxDefLvl int16 Review comment: comment added. ########## File path: go/parquet/schema/column.go ########## @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "fmt" + "strings" + + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "golang.org/x/xerrors" +) + +// Column encapsulates the information necessary to interpret primitive +// column data in the context of a particular schema. We have to examine +// the node structure of a column's path to the root in the schema tree +// to be able to reassemble the nested structure from the repetition and +// definition levels. +type Column struct { + pnode *PrimitiveNode + // the maximum definition level in this column + maxDefLvl int16 + // the maximum repetition level in this column Review comment: updated ########## File path: go/parquet/schema/converted_types.go ########## @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// ConvertedType corresponds to the ConvertedType in the parquet.Thrift, +// with added values of None and NA for handling when these values are not +// set in the metadata +type ConvertedType format.ConvertedType + +var ( + // ConvertedTypes is a struct containing the constants for the types + // to make it easy to reference them while making it clear what they are + ConvertedTypes = struct { + None ConvertedType + UTF8 ConvertedType + Map ConvertedType + MapKeyValue ConvertedType + List ConvertedType + Enum ConvertedType + Decimal ConvertedType + Date ConvertedType + TimeMillis ConvertedType + TimeMicros ConvertedType + TimestampMillis ConvertedType + TimestampMicros ConvertedType + Uint8 ConvertedType + Uint16 ConvertedType + Uint32 ConvertedType + Uint64 ConvertedType + Int8 ConvertedType + Int16 ConvertedType + Int32 ConvertedType + Int64 ConvertedType + JSON ConvertedType + BSON ConvertedType + Interval ConvertedType + NA ConvertedType + }{ + None: -1, // thrift enum starts at 0, so we know this will not be used + UTF8: ConvertedType(format.ConvertedType_UTF8), + Map: ConvertedType(format.ConvertedType_MAP), + MapKeyValue: ConvertedType(format.ConvertedType_MAP_KEY_VALUE), + List: ConvertedType(format.ConvertedType_LIST), + Enum: ConvertedType(format.ConvertedType_ENUM), + Decimal: ConvertedType(format.ConvertedType_DECIMAL), + Date: ConvertedType(format.ConvertedType_DATE), + TimeMillis: ConvertedType(format.ConvertedType_TIME_MILLIS), + TimeMicros: ConvertedType(format.ConvertedType_TIME_MICROS), + TimestampMillis: ConvertedType(format.ConvertedType_TIMESTAMP_MILLIS), + TimestampMicros: ConvertedType(format.ConvertedType_TIMESTAMP_MICROS), + Uint8: ConvertedType(format.ConvertedType_UINT_8), + Uint16: ConvertedType(format.ConvertedType_UINT_16), + Uint32: ConvertedType(format.ConvertedType_UINT_32), + Uint64: ConvertedType(format.ConvertedType_UINT_64), + Int8: ConvertedType(format.ConvertedType_INT_8), + Int16: ConvertedType(format.ConvertedType_INT_16), + Int32: ConvertedType(format.ConvertedType_INT_32), + Int64: ConvertedType(format.ConvertedType_INT_64), + JSON: ConvertedType(format.ConvertedType_JSON), + BSON: ConvertedType(format.ConvertedType_BSON), + Interval: ConvertedType(format.ConvertedType_INTERVAL), + NA: 24, // should always be the last values after Interval + } +) + +func (p ConvertedType) String() string { + switch p { + case ConvertedTypes.None: + return "NONE" + case ConvertedTypes.NA: + return "UNKNOWN" + default: + return format.ConvertedType(p).String() + } +} + +// ToLogicalType returns the correct LogicalType for the given ConvertedType, using the decimal +// metadata provided to define the precision/scale if necessary +func (p ConvertedType) ToLogicalType(convertedDecimal DecimalMetadata) LogicalType { + switch p { + case ConvertedTypes.UTF8: + return StringLogicalType{} + case ConvertedTypes.Map, ConvertedTypes.MapKeyValue: + return MapLogicalType{} + case ConvertedTypes.List: + return ListLogicalType{} + case ConvertedTypes.Enum: + return EnumLogicalType{} + case ConvertedTypes.Decimal: + return NewDecimalLogicalType(convertedDecimal.Precision, convertedDecimal.Scale) + case ConvertedTypes.Date: + return DateLogicalType{} + case ConvertedTypes.TimeMillis: + return NewTimeLogicalType(true, TimeUnitMillis) Review comment: done ########## File path: go/parquet/schema/converted_types.go ########## @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" +) + +// ConvertedType corresponds to the ConvertedType in the parquet.Thrift, +// with added values of None and NA for handling when these values are not +// set in the metadata +type ConvertedType format.ConvertedType + +var ( + // ConvertedTypes is a struct containing the constants for the types + // to make it easy to reference them while making it clear what they are + ConvertedTypes = struct { + None ConvertedType + UTF8 ConvertedType + Map ConvertedType + MapKeyValue ConvertedType + List ConvertedType + Enum ConvertedType + Decimal ConvertedType + Date ConvertedType + TimeMillis ConvertedType + TimeMicros ConvertedType + TimestampMillis ConvertedType + TimestampMicros ConvertedType + Uint8 ConvertedType + Uint16 ConvertedType + Uint32 ConvertedType + Uint64 ConvertedType + Int8 ConvertedType + Int16 ConvertedType + Int32 ConvertedType + Int64 ConvertedType + JSON ConvertedType + BSON ConvertedType + Interval ConvertedType + NA ConvertedType + }{ + None: -1, // thrift enum starts at 0, so we know this will not be used + UTF8: ConvertedType(format.ConvertedType_UTF8), + Map: ConvertedType(format.ConvertedType_MAP), + MapKeyValue: ConvertedType(format.ConvertedType_MAP_KEY_VALUE), + List: ConvertedType(format.ConvertedType_LIST), + Enum: ConvertedType(format.ConvertedType_ENUM), + Decimal: ConvertedType(format.ConvertedType_DECIMAL), + Date: ConvertedType(format.ConvertedType_DATE), + TimeMillis: ConvertedType(format.ConvertedType_TIME_MILLIS), + TimeMicros: ConvertedType(format.ConvertedType_TIME_MICROS), + TimestampMillis: ConvertedType(format.ConvertedType_TIMESTAMP_MILLIS), + TimestampMicros: ConvertedType(format.ConvertedType_TIMESTAMP_MICROS), + Uint8: ConvertedType(format.ConvertedType_UINT_8), + Uint16: ConvertedType(format.ConvertedType_UINT_16), + Uint32: ConvertedType(format.ConvertedType_UINT_32), + Uint64: ConvertedType(format.ConvertedType_UINT_64), + Int8: ConvertedType(format.ConvertedType_INT_8), + Int16: ConvertedType(format.ConvertedType_INT_16), + Int32: ConvertedType(format.ConvertedType_INT_32), + Int64: ConvertedType(format.ConvertedType_INT_64), + JSON: ConvertedType(format.ConvertedType_JSON), + BSON: ConvertedType(format.ConvertedType_BSON), + Interval: ConvertedType(format.ConvertedType_INTERVAL), + NA: 24, // should always be the last values after Interval + } +) + +func (p ConvertedType) String() string { + switch p { + case ConvertedTypes.None: + return "NONE" + case ConvertedTypes.NA: + return "UNKNOWN" + default: + return format.ConvertedType(p).String() + } +} + +// ToLogicalType returns the correct LogicalType for the given ConvertedType, using the decimal +// metadata provided to define the precision/scale if necessary +func (p ConvertedType) ToLogicalType(convertedDecimal DecimalMetadata) LogicalType { + switch p { + case ConvertedTypes.UTF8: + return StringLogicalType{} + case ConvertedTypes.Map, ConvertedTypes.MapKeyValue: + return MapLogicalType{} + case ConvertedTypes.List: + return ListLogicalType{} + case ConvertedTypes.Enum: + return EnumLogicalType{} + case ConvertedTypes.Decimal: + return NewDecimalLogicalType(convertedDecimal.Precision, convertedDecimal.Scale) + case ConvertedTypes.Date: + return DateLogicalType{} + case ConvertedTypes.TimeMillis: + return NewTimeLogicalType(true, TimeUnitMillis) + case ConvertedTypes.TimeMicros: + return NewTimeLogicalType(true, TimeUnitMicros) + case ConvertedTypes.TimestampMillis: + t := NewTimestampLogicalType(true, TimeUnitMillis) + t.(*TimestampLogicalType).fromConverted = true + return t + case ConvertedTypes.TimestampMicros: + t := NewTimestampLogicalType(true, TimeUnitMicros) + t.(*TimestampLogicalType).fromConverted = true + return t + case ConvertedTypes.Interval: + return IntervalLogicalType{} + case ConvertedTypes.Int8: + return NewIntLogicalType(8, true) Review comment: done ########## File path: go/parquet/schema/helpers.go ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import "github.com/apache/arrow/go/parquet" + +// ListOf is a convenience helper function to create a properly structured +// list structure according to the Parquet Spec. +// +// <list-repetition> group <name> (LIST) { +// repeated group list { +// <element-repetition> <element-type> element; +// } +// } +// +// <list-repetition> can only be optional or required. panics if repeated. +// <element-repetition> can only be optional or required. panics if repeated. +func ListOf(n Node, rep parquet.Repetition, fieldID int32) *GroupNode { + if rep == parquet.Repetitions.Repeated || n.RepetitionType() == parquet.Repetitions.Repeated { + panic("parquet: listof repetition and element repetition must not be repeated.") + } + listName := n.Name() + + switch n := n.(type) { + case *PrimitiveNode: + n.name = "element" + case *GroupNode: + n.name = "element" + } + + return NewGroupNodeLogical(listName, rep, FieldList{ + NewGroupNode("list", parquet.Repetitions.Repeated, FieldList{n}, -1)}, Review comment: done ########## File path: go/parquet/schema/helpers.go ########## @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import "github.com/apache/arrow/go/parquet" + +// ListOf is a convenience helper function to create a properly structured +// list structure according to the Parquet Spec. +// +// <list-repetition> group <name> (LIST) { +// repeated group list { +// <element-repetition> <element-type> element; +// } +// } +// +// <list-repetition> can only be optional or required. panics if repeated. +// <element-repetition> can only be optional or required. panics if repeated. +func ListOf(n Node, rep parquet.Repetition, fieldID int32) *GroupNode { + if rep == parquet.Repetitions.Repeated || n.RepetitionType() == parquet.Repetitions.Repeated { + panic("parquet: listof repetition and element repetition must not be repeated.") + } + listName := n.Name() + + switch n := n.(type) { + case *PrimitiveNode: + n.name = "element" + case *GroupNode: + n.name = "element" + } + + return NewGroupNodeLogical(listName, rep, FieldList{ + NewGroupNode("list", parquet.Repetitions.Repeated, FieldList{n}, -1)}, + ListLogicalType{}, fieldID) +} + +// MapOf is a convenience helper function to create a properly structured +// parquet map node setup according to the Parquet Spec. +// +// <map-repetition> group <name> (MAP) { +// repeated group key_value { +// required <key-type> key; +// <value-repetition> <value-type> value; +// } +// } +// +// key node will be renamed to "key", value node if not nil will be renamed to "value" +// +// <map-repetition> must be only optional or required. panics if repeated is passed. +// +// the key node *must* be required repetition. panics if optional or repeated +// +// value node can be nil (omitted) or have a repetition of required or optional *only*. +// panics if value node is not nil and has a repetition of repeated. +func MapOf(name string, key Node, value Node, mapRep parquet.Repetition, fieldID int32) *GroupNode { + if mapRep == parquet.Repetitions.Repeated { + panic("parquet: map repetition cannot be Repeated") + } + if key.RepetitionType() != parquet.Repetitions.Required { + panic("parquet: map key repetition must be Required") + } + if value != nil { + if value.RepetitionType() == parquet.Repetitions.Repeated { + panic("parquet: map value cannot have repetition Repeated") + } + switch value := value.(type) { + case *PrimitiveNode: + value.name = "value" + case *GroupNode: + value.name = "value" + } + } + + switch key := key.(type) { + case *PrimitiveNode: + key.name = "key" + case *GroupNode: + key.name = "key" + } + + keyval := FieldList{key} + if value != nil { + keyval = append(keyval, value) + } + + return NewGroupNodeLogical(name, mapRep, FieldList{ + NewGroupNode("key_value", parquet.Repetitions.Repeated, keyval, -1), Review comment: done ########## File path: go/parquet/schema/logical_types_test.go ########## @@ -0,0 +1,551 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema_test + +import ( + "encoding/json" + "testing" + + "github.com/apache/arrow/go/parquet" + "github.com/apache/arrow/go/parquet/schema" + "github.com/stretchr/testify/assert" +) + +func TestConvertedLogicalEquivalences(t *testing.T) { + tests := []struct { + name string + converted schema.ConvertedType + logical schema.LogicalType + expected schema.LogicalType + }{ + {"utf8", schema.ConvertedTypes.UTF8, schema.StringLogicalType{}, schema.StringLogicalType{}}, + {"map", schema.ConvertedTypes.Map, schema.MapLogicalType{}, schema.MapLogicalType{}}, + {"mapkeyval", schema.ConvertedTypes.MapKeyValue, schema.MapLogicalType{}, schema.MapLogicalType{}}, + {"list", schema.ConvertedTypes.List, schema.NewListLogicalType(), schema.NewListLogicalType()}, + {"enum", schema.ConvertedTypes.Enum, schema.EnumLogicalType{}, schema.EnumLogicalType{}}, + {"date", schema.ConvertedTypes.Date, schema.DateLogicalType{}, schema.DateLogicalType{}}, + {"timemilli", schema.ConvertedTypes.TimeMillis, schema.NewTimeLogicalType(true, schema.TimeUnitMillis), &schema.TimeLogicalType{}}, + {"timemicro", schema.ConvertedTypes.TimeMicros, schema.NewTimeLogicalType(true, schema.TimeUnitMicros), &schema.TimeLogicalType{}}, + {"timestampmilli", schema.ConvertedTypes.TimestampMillis, schema.NewTimestampLogicalType(true, schema.TimeUnitMillis), &schema.TimestampLogicalType{}}, + {"timestampmicro", schema.ConvertedTypes.TimestampMicros, schema.NewTimestampLogicalType(true, schema.TimeUnitMicros), &schema.TimestampLogicalType{}}, + {"uint8", schema.ConvertedTypes.Uint8, schema.NewIntLogicalType(8, false), &schema.IntLogicalType{}}, + {"uint16", schema.ConvertedTypes.Uint16, schema.NewIntLogicalType(16, false), &schema.IntLogicalType{}}, + {"uint32", schema.ConvertedTypes.Uint32, schema.NewIntLogicalType(32, false), &schema.IntLogicalType{}}, + {"uint64", schema.ConvertedTypes.Uint64, schema.NewIntLogicalType(64, false), &schema.IntLogicalType{}}, + {"int8", schema.ConvertedTypes.Int8, schema.NewIntLogicalType(8, true), &schema.IntLogicalType{}}, Review comment: done ########## File path: go/parquet/schema/node.go ########## @@ -0,0 +1,591 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/thrift/lib/go/thrift" + "golang.org/x/xerrors" +) + +// NodeType describes whether the Node is a Primitive or Group node +type NodeType int + +// the available constants for NodeType +const ( + Primitive NodeType = iota + Group +) + +// Node is the interface for both Group and Primitive Nodes. +// A logical schema type has a name, repetition level, and optionally +// a logical type (ConvertedType in Parquet metadata parlance) Review comment: fixed ########## File path: go/parquet/schema/node.go ########## @@ -0,0 +1,591 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/apache/thrift/lib/go/thrift" + "golang.org/x/xerrors" +) + +// NodeType describes whether the Node is a Primitive or Group node +type NodeType int + +// the available constants for NodeType +const ( + Primitive NodeType = iota + Group +) + +// Node is the interface for both Group and Primitive Nodes. +// A logical schema type has a name, repetition level, and optionally +// a logical type (ConvertedType in Parquet metadata parlance) +type Node interface { + Name() string + Type() NodeType + RepetitionType() parquet.Repetition + ConvertedType() ConvertedType + LogicalType() LogicalType + FieldID() int32 + Parent() Node + SetParent(Node) + Path() string + Equals(Node) bool + Visit(v Visitor) + toThrift() *format.SchemaElement +} + +// Visitor is an interface for creating functionality to walk the schema tree. +// +// A visitor can be passed to the Visit function of a Node in order to walk +// the tree. VisitPre is called the first time a node is encountered. If +// it is a group node, the return is checked and if it is false, the children +// will be skipped. +// +// VisitPost is called after visiting any children +type Visitor interface { + VisitPre(Node) bool + VisitPost(Node) +} + +// ColumnPathFromNode walks the parents of the given node to construct it's +// column path +func ColumnPathFromNode(n Node) parquet.ColumnPath { + if n == nil { + return nil + } + + c := make([]string, 0) + + cursor := n + for cursor.Parent() != nil { + c = append(c, cursor.Name()) + cursor = cursor.Parent() + } + + for i := len(c)/2 - 1; i >= 0; i-- { Review comment: added comments ########## File path: go/parquet/schema/reflection.go ########## @@ -0,0 +1,791 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "reflect" + "strconv" + "strings" + + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "golang.org/x/xerrors" +) + +type taggedInfo struct { + Name string + + Type parquet.Type + KeyType parquet.Type + ValueType parquet.Type + + Length int32 + KeyLength int32 + ValueLength int32 + + Scale int32 + KeyScale int32 + ValueScale int32 + + Precision int32 + KeyPrecision int32 + ValuePrecision int32 + + FieldID int32 + KeyFieldID int32 + ValueFieldID int32 + + RepetitionType parquet.Repetition + ValueRepetition parquet.Repetition + + Converted ConvertedType + KeyConverted ConvertedType + ValueConverted ConvertedType + + LogicalFields map[string]string + KeyLogicalFields map[string]string + ValueLogicalFields map[string]string + + LogicalType LogicalType + KeyLogicalType LogicalType + ValueLogicalType LogicalType +} + +func (t *taggedInfo) CopyForKey() (ret taggedInfo) { + ret = *t + ret.Type = t.KeyType + ret.Length = t.KeyLength + ret.Scale = t.KeyScale + ret.Precision = t.KeyPrecision + ret.FieldID = t.KeyFieldID + ret.RepetitionType = parquet.Repetitions.Required + ret.Converted = t.KeyConverted + ret.LogicalType = t.KeyLogicalType + return +} + +func (t *taggedInfo) CopyForValue() (ret taggedInfo) { + ret = *t + ret.Type = t.ValueType + ret.Length = t.ValueLength + ret.Scale = t.ValueScale + ret.Precision = t.ValuePrecision + ret.FieldID = t.ValueFieldID + ret.RepetitionType = t.ValueRepetition + ret.Converted = t.ValueConverted + ret.LogicalType = t.ValueLogicalType + return +} + +func (t *taggedInfo) UpdateLogicalTypes() { + processLogicalType := func(fields map[string]string, precision, scale int32) LogicalType { + t, ok := fields["type"] + if !ok { + return NoLogicalType{} + } + + switch strings.ToLower(t) { + case "string": + return StringLogicalType{} + case "map": + return MapLogicalType{} + case "list": + return ListLogicalType{} + case "enum": + return EnumLogicalType{} + case "decimal": + if v, ok := fields["precision"]; ok { + precision = int32FromType(v) + } + if v, ok := fields["scale"]; ok { + scale = int32FromType(v) + } + return NewDecimalLogicalType(precision, scale) + case "date": + return DateLogicalType{} + case "time": + unit, ok := fields["unit"] + if !ok { + panic("must specify unit for time logical type") + } + adjustedToUtc, ok := fields["isadjustedutc"] + if !ok { + adjustedToUtc = "true" + } + return NewTimeLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(strings.ToLower(unit))) + case "timestamp": + unit, ok := fields["unit"] + if !ok { + panic("must specify unit for time logical type") + } + adjustedToUtc, ok := fields["isadjustedutc"] + if !ok { + adjustedToUtc = "true" + } + return NewTimestampLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(unit)) + case "integer": + width, ok := fields["bitwidth"] + if !ok { + panic("must specify bitwidth if explicitly setting integer logical type") + } + signed, ok := fields["signed"] + if !ok { + signed = "true" + } + + return NewIntLogicalType(int8(int32FromType(width)), boolFromStr(signed)) + case "null": + return NullLogicalType{} + case "json": + return JSONLogicalType{} + case "bson": + return BSONLogicalType{} + case "uuid": + return UUIDLogicalType{} + default: + panic(xerrors.Errorf("invalid logical type specified: %s", t)) + } + } + + t.LogicalType = processLogicalType(t.LogicalFields, t.Precision, t.Scale) + t.KeyLogicalType = processLogicalType(t.KeyLogicalFields, t.KeyPrecision, t.KeyScale) + t.ValueLogicalType = processLogicalType(t.ValueLogicalFields, t.ValuePrecision, t.ValueScale) +} + +func newTaggedInfo() taggedInfo { + return taggedInfo{ + Type: parquet.Types.Undefined, + KeyType: parquet.Types.Undefined, + ValueType: parquet.Types.Undefined, + RepetitionType: parquet.Repetitions.Undefined, + ValueRepetition: parquet.Repetitions.Undefined, + Converted: ConvertedTypes.NA, + KeyConverted: ConvertedTypes.NA, + ValueConverted: ConvertedTypes.NA, + FieldID: -1, + KeyFieldID: -1, + ValueFieldID: -1, + LogicalFields: make(map[string]string), + KeyLogicalFields: make(map[string]string), + ValueLogicalFields: make(map[string]string), + LogicalType: NoLogicalType{}, + KeyLogicalType: NoLogicalType{}, + ValueLogicalType: NoLogicalType{}, + } +} + +var int32FromType = func(v string) int32 { + val, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return int32(val) +} + +var boolFromStr = func(v string) bool { + val, err := strconv.ParseBool(v) + if err != nil { + panic(err) + } + return val +} + +func infoFromTags(f reflect.StructTag) *taggedInfo { + typeFromStr := func(v string) parquet.Type { + t, err := format.TypeFromString(strings.ToUpper(v)) + if err != nil { + panic(xerrors.Errorf("invalid type specified: %s", v)) + } + return parquet.Type(t) + } + + repFromStr := func(v string) parquet.Repetition { + r, err := format.FieldRepetitionTypeFromString(strings.ToUpper(v)) + if err != nil { + panic(err) + } + return parquet.Repetition(r) + } + + convertedFromStr := func(v string) ConvertedType { + c, err := format.ConvertedTypeFromString(strings.ToUpper(v)) + if err != nil { + panic(err) + } + return ConvertedType(c) + } + + if ptags, ok := f.Lookup("parquet"); ok { + info := newTaggedInfo() + for _, tag := range strings.Split(strings.Replace(ptags, "\t", "", -1), ",") { + tag = strings.TrimSpace(tag) + kv := strings.SplitN(tag, "=", 2) + key := strings.TrimSpace(strings.ToLower(kv[0])) + value := strings.TrimSpace(kv[1]) + + switch key { + case "name": + info.Name = value + case "type": + info.Type = typeFromStr(value) + case "keytype": + info.KeyType = typeFromStr(value) + case "valuetype": + info.ValueType = typeFromStr(value) + case "length": + info.Length = int32FromType(value) + case "keylength": + info.KeyLength = int32FromType(value) + case "valuelength": + info.ValueLength = int32FromType(value) + case "scale": + info.Scale = int32FromType(value) + case "keyscale": + info.KeyScale = int32FromType(value) + case "valuescale": + info.ValueScale = int32FromType(value) + case "precision": + info.Precision = int32FromType(value) + case "keyprecision": + info.KeyPrecision = int32FromType(value) + case "valueprecision": + info.ValuePrecision = int32FromType(value) + case "fieldid": + info.FieldID = int32FromType(value) + case "keyfieldid": + info.KeyFieldID = int32FromType(value) + case "valuefieldid": + info.ValueFieldID = int32FromType(value) + case "repetition": + info.RepetitionType = repFromStr(value) + case "valuerepetition": + info.ValueRepetition = repFromStr(value) + case "converted": + info.Converted = convertedFromStr(value) + case "keyconverted": + info.KeyConverted = convertedFromStr(value) + case "valueconverted": + info.ValueConverted = convertedFromStr(value) + case "logical": + info.LogicalFields["type"] = value + case "keylogical": + info.KeyLogicalFields["type"] = value + case "valuelogical": + info.ValueLogicalFields["type"] = value + default: + switch { + case strings.HasPrefix(key, "logical."): + info.LogicalFields[strings.TrimPrefix(key, "logical.")] = value + case strings.HasPrefix(key, "keylogical."): + info.KeyLogicalFields[strings.TrimPrefix(key, "keylogical.")] = value + case strings.HasPrefix(key, "valuelogical."): + info.ValueLogicalFields[strings.TrimPrefix(key, "valuelogical.")] = value + } + } + } + info.UpdateLogicalTypes() + return &info + } + return nil +} + +func typeToNode(name string, typ reflect.Type, repType parquet.Repetition, info *taggedInfo) Node { Review comment: added comments to the function and the rest of the function body ########## File path: go/parquet/schema/schema_element_test.go ########## @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "testing" + + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +type schemaElementConstruction struct { + node Node + element *format.SchemaElement + name string + expectConverted bool + converted ConvertedType + expectLogical bool + checkLogical func(*format.SchemaElement) bool +} + +type decimalSchemaElementConstruction struct { + schemaElementConstruction + precision int + scale int +} + +type temporalSchemaElementConstruction struct { + schemaElementConstruction + adjusted bool + unit TimeUnitType + getUnit func(*format.SchemaElement) *format.TimeUnit +} + +type intSchemaElementConstruction struct { + schemaElementConstruction + width int8 + signed bool +} + +type legacySchemaElementConstructArgs struct { + name string + physical parquet.Type + len int + expectConverted bool + converted ConvertedType + expectLogical bool + checkLogical func(*format.SchemaElement) bool +} + +type schemaElementConstructArgs struct { + name string + logical LogicalType + physical parquet.Type + len int + expectConverted bool + converted ConvertedType + expectLogical bool + checkLogical func(*format.SchemaElement) bool +} +type SchemaElementConstructionSuite struct { + suite.Suite +} + +func (s *SchemaElementConstructionSuite) reconstruct(c schemaElementConstructArgs) *schemaElementConstruction { + ret := &schemaElementConstruction{ + node: NewPrimitiveNodeLogical(c.name, parquet.Repetitions.Required, c.logical, c.physical, c.len, -1), + name: c.name, + expectConverted: c.expectConverted, + converted: c.converted, + expectLogical: c.expectLogical, + checkLogical: c.checkLogical, + } + ret.element = ret.node.toThrift() + return ret +} + +func (s *SchemaElementConstructionSuite) legacyReconstruct(c legacySchemaElementConstructArgs) *schemaElementConstruction { + ret := &schemaElementConstruction{ + node: NewPrimitiveNodeConverted(c.name, parquet.Repetitions.Required, c.physical, c.converted, c.len, 0, 0, -1), + name: c.name, + expectConverted: c.expectConverted, + converted: c.converted, + expectLogical: c.expectLogical, + checkLogical: c.checkLogical, + } + ret.element = ret.node.toThrift() + return ret +} + +func (s *SchemaElementConstructionSuite) inspect(c *schemaElementConstruction) { + if c.expectConverted { + s.True(c.element.IsSetConvertedType()) + s.Equal(c.converted, ConvertedType(*c.element.ConvertedType)) + } else { + s.False(c.element.IsSetConvertedType()) + } + if c.expectLogical { + s.True(c.element.IsSetLogicalType()) + s.True(c.checkLogical(c.element)) + } else { + s.False(c.element.IsSetLogicalType()) + } +} + +func (s *SchemaElementConstructionSuite) TestSimple() { + checkNone := func(*format.SchemaElement) bool { return true } + + tests := []struct { + name string + args *schemaElementConstructArgs + legacy *legacySchemaElementConstructArgs + }{ + {"string", &schemaElementConstructArgs{ + "string", StringLogicalType{}, parquet.Types.ByteArray, -1, true, ConvertedTypes.UTF8, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetSTRING() }, + }, nil}, + {"enum", &schemaElementConstructArgs{ + "enum", EnumLogicalType{}, parquet.Types.ByteArray, -1, true, ConvertedTypes.Enum, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetENUM() }, + }, nil}, + {"date", &schemaElementConstructArgs{ + "date", DateLogicalType{}, parquet.Types.Int32, -1, true, ConvertedTypes.Date, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetDATE() }, + }, nil}, + {"interval", &schemaElementConstructArgs{ + "interval", IntervalLogicalType{}, parquet.Types.FixedLenByteArray, 12, true, ConvertedTypes.Interval, false, + checkNone, + }, nil}, + {"null", &schemaElementConstructArgs{ + "null", NullLogicalType{}, parquet.Types.Double, -1, false, ConvertedTypes.NA, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetUNKNOWN() }, + }, nil}, + {"json", &schemaElementConstructArgs{ + "json", JSONLogicalType{}, parquet.Types.ByteArray, -1, true, ConvertedTypes.JSON, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetJSON() }, + }, nil}, + {"bson", &schemaElementConstructArgs{ + "bson", BSONLogicalType{}, parquet.Types.ByteArray, -1, true, ConvertedTypes.BSON, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetBSON() }, + }, nil}, + {"uuid", &schemaElementConstructArgs{ + "uuid", UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16, false, ConvertedTypes.NA, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetUUID() }, + }, nil}, + {"none", &schemaElementConstructArgs{ + "none", NoLogicalType{}, parquet.Types.Int64, -1, false, ConvertedTypes.NA, false, + checkNone, + }, nil}, + {"unknown", &schemaElementConstructArgs{ + "unknown", UnknownLogicalType{}, parquet.Types.Int64, -1, true, ConvertedTypes.NA, false, + checkNone, + }, nil}, + {"timestamp_ms", nil, &legacySchemaElementConstructArgs{ + "timestamp_ms", parquet.Types.Int64, -1, true, ConvertedTypes.TimestampMillis, false, checkNone}}, + {"timestamp_us", nil, &legacySchemaElementConstructArgs{ + "timestamp_us", parquet.Types.Int64, -1, true, ConvertedTypes.TimestampMicros, false, checkNone}}, + } + for _, tt := range tests { + s.Run(tt.name, func() { + var sc *schemaElementConstruction + if tt.args != nil { + sc = s.reconstruct(*tt.args) + } else { + sc = s.legacyReconstruct(*tt.legacy) + } + s.Equal(tt.name, sc.element.Name) + s.inspect(sc) + }) + } +} + +func (s *SchemaElementConstructionSuite) reconstructDecimal(c schemaElementConstructArgs) *decimalSchemaElementConstruction { + ret := s.reconstruct(c) + dec := c.logical.(*DecimalLogicalType) + return &decimalSchemaElementConstruction{*ret, int(dec.Precision()), int(dec.Scale())} +} + +func (s *SchemaElementConstructionSuite) inspectDecimal(d *decimalSchemaElementConstruction) { + s.inspect(&d.schemaElementConstruction) + s.EqualValues(d.precision, d.element.GetPrecision()) + s.EqualValues(d.scale, d.element.GetScale()) + s.EqualValues(d.precision, d.element.LogicalType.DECIMAL.Precision) + s.EqualValues(d.scale, d.element.LogicalType.DECIMAL.Scale) +} + +func (s *SchemaElementConstructionSuite) TestDecimal() { + checkDecimal := func(p *format.SchemaElement) bool { return p.LogicalType.IsSetDECIMAL() } + + tests := []schemaElementConstructArgs{ + {"decimal16_6", NewDecimalLogicalType(16, 6), parquet.Types.Int64, -1, true, ConvertedTypes.Decimal, true, checkDecimal}, + {"decimal1_0", NewDecimalLogicalType(1, 0), parquet.Types.Int32, -1, true, ConvertedTypes.Decimal, true, checkDecimal}, + {"decimal10", NewDecimalLogicalType(10, 0), parquet.Types.Int64, -1, true, ConvertedTypes.Decimal, true, checkDecimal}, Review comment: added ########## File path: go/parquet/schema/schema_flatten_test.go ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "testing" + + "github.com/apache/arrow/go/parquet" + format "github.com/apache/arrow/go/parquet/internal/gen-go/parquet" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" +) + +func NewPrimitive(name string, repetition format.FieldRepetitionType, typ format.Type, fieldID int32) *format.SchemaElement { + ret := &format.SchemaElement{ + Name: name, + RepetitionType: format.FieldRepetitionTypePtr(repetition), + Type: format.TypePtr(typ), + } + if fieldID >= 0 { + ret.FieldID = &fieldID + } + return ret +} + +func NewGroup(name string, repetition format.FieldRepetitionType, numChildren, fieldID int32) *format.SchemaElement { + ret := &format.SchemaElement{ + Name: name, + RepetitionType: format.FieldRepetitionTypePtr(repetition), + NumChildren: &numChildren, + } + if fieldID >= 0 { + ret.FieldID = &fieldID + } + return ret +} + +type SchemaFlattenSuite struct { + suite.Suite + + name string +} + +func (s *SchemaFlattenSuite) SetupSuite() { + s.name = "parquet_schema" +} + +func (s *SchemaFlattenSuite) TestDecimalMetadata() { + group := NewGroupNodeConverted("group", parquet.Repetitions.Repeated, FieldList{ + NewPrimitiveNodeConverted("decimal", parquet.Repetitions.Required, parquet.Types.Int64, ConvertedTypes.Decimal, 0, 8, 4, -1), Review comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
