zeroshade commented on code in PR #558:
URL: https://github.com/apache/arrow-go/pull/558#discussion_r2771557250
##########
arrow/array/encoded.go:
##########
@@ -386,6 +386,15 @@ func (b *RunEndEncodedBuilder) ContinueRun(n uint64) {
b.addLength(n)
}
+func (b *RunEndEncodedBuilder) UnsafeAppendBoolToBitmap(v bool) {
+ if !v {
+ b.finishRun()
+ }
Review Comment:
this seems incorrect, it could be a run of nulls, couldn't it? And wouldn't
the run itself be handled by the value append? I don't think we need to call
`b.finishRun` here
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
Review Comment:
`arrow.IsInteger` is unnecessary here, by definition only integer types are
allowed for the IndexTypes of a dictionary type. Anything else is invalid
anyways, so there's no need to check that.
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but a Arrow
IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would always
fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit,
offsetType arrow.DataType, ok bool) {
+ unit = arrow.Second
+ offsetType = arrow.PrimitiveTypes.Int16
+ ok = false
+
+ st, compat := storageType.(*arrow.StructType)
+ if !compat || st.NumFields() != 2 {
+ return
+ }
+
+ if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat &&
ts.TimeZone == "UTC" {
+ unit = ts.TimeUnit()
+ } else {
+ return
+ }
+
+ maybeOffset := st.Field(1)
+ offsetType = maybeOffset.Type
+
+ ok = st.Field(0).Name == "timestamp" &&
+ !st.Field(0).Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ isOffsetTypeOk(offsetType) &&
+ !maybeOffset.Nullable
+ return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit,
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+type DictIndexType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+ offsetType := arrow.DictionaryType{
+ IndexType: arrow.DataType(index),
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+ // SAFETY: This should never error as DictIndexType is always a valid
index type
+
+ return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
Review Comment:
Update the comment to remove this since the generics will affirm that we
can't have this error
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but a Arrow
IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would always
fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit,
offsetType arrow.DataType, ok bool) {
+ unit = arrow.Second
+ offsetType = arrow.PrimitiveTypes.Int16
+ ok = false
+
+ st, compat := storageType.(*arrow.StructType)
+ if !compat || st.NumFields() != 2 {
+ return
+ }
+
+ if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat &&
ts.TimeZone == "UTC" {
+ unit = ts.TimeUnit()
+ } else {
+ return
+ }
+
+ maybeOffset := st.Field(1)
+ offsetType = maybeOffset.Type
+
+ ok = st.Field(0).Name == "timestamp" &&
+ !st.Field(0).Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ isOffsetTypeOk(offsetType) &&
+ !maybeOffset.Nullable
+ return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit,
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+type DictIndexType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+ offsetType := arrow.DictionaryType{
+ IndexType: arrow.DataType(index),
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+ // SAFETY: This should never error as DictIndexType is always a valid
index type
+
+ return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
Review Comment:
this and the `DictIndexType` are both identical, instead of duplicating this
we should either create a single type constraint or embed one in the other:
```go
type TimestampWithOffsetRunEndsType interface {
DictIndexType
}
```
My personal preference would be to have a single type though, but I'm not
averse to having the two separate ones if necessary.
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but a Arrow
IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would always
fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit,
offsetType arrow.DataType, ok bool) {
+ unit = arrow.Second
+ offsetType = arrow.PrimitiveTypes.Int16
+ ok = false
+
+ st, compat := storageType.(*arrow.StructType)
+ if !compat || st.NumFields() != 2 {
+ return
+ }
+
+ if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat &&
ts.TimeZone == "UTC" {
+ unit = ts.TimeUnit()
+ } else {
+ return
+ }
+
+ maybeOffset := st.Field(1)
+ offsetType = maybeOffset.Type
+
+ ok = st.Field(0).Name == "timestamp" &&
+ !st.Field(0).Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ isOffsetTypeOk(offsetType) &&
+ !maybeOffset.Nullable
+ return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit,
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+type DictIndexType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
Review Comment:
remove this line from the comment since the generics prevent this from
happening
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but
a Arrow IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would
always fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit,
arrow.DataType, bool) {
+ timeUnit := arrow.Second
+ offsetType := arrow.PrimitiveTypes.Int16
+ switch t := storageType.(type) {
+ case *arrow.StructType:
+ if t.NumFields() != 2 {
+ return timeUnit, offsetType, false
+ }
+
+ maybeTimestamp := t.Field(0)
+ maybeOffset := t.Field(1)
+
+ timestampOk := false
+ switch timestampType := maybeTimestamp.Type.(type) {
+ case *arrow.TimestampType:
+ if timestampType.TimeZone == "UTC" {
+ timestampOk = true
+ timeUnit = timestampType.TimeUnit()
+ }
+ default:
+ }
+
+ offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+ ok := maybeTimestamp.Name == "timestamp" &&
+ timestampOk &&
+ !maybeTimestamp.Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ offsetOk &&
+ !maybeOffset.Nullable
+
+ return timeUnit, maybeOffset.Type, ok
+ default:
+ return timeUnit, offsetType, false
+ }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit)
*TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.DictionaryType{
+ IndexType: index,
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+ if !offsetType.ValidRunEndsType(runEnds) {
+ return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s",
runEnds))
+ }
+
+ return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+ return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+ return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`,
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data
string) (arrow.ExtensionType, error) {
+ timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+ if !ok {
+ return nil, fmt.Errorf("invalid storage type for
TimestampWithOffsetType: %s", storageType.Name())
+ }
+
+ v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+ // SAFETY: the offsetType has already been checked by
isDataTypeCompatible, so we can ignore the error
+
+ return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType)
bool {
+ return b.ExtensionName() == other.ExtensionName()
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+ return
b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator)
array.Builder {
+ v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(),
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This will never error as Int16 is always a valid type for
the offset field
+
+ return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+ array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+ var o strings.Builder
+ o.WriteString("[")
+ for i := 0; i < a.Len(); i++ {
+ if i > 0 {
+ o.WriteString(" ")
+ }
+ switch {
+ case a.IsNull(i):
+ o.WriteString(array.NullValueStr)
+ default:
+ fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+ }
+ }
+ o.WriteString("]")
+ return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16,
unit arrow.TimeUnit) time.Time {
+ hours := offsetMinutes / 60
+ minutes := offsetMinutes % 60
+ if minutes < 0 {
+ minutes = -minutes
+ }
+
+ loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes),
int(offsetMinutes)*60)
+ return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp,
int16) {
+ // naive "bitwise" conversion to UTC, keeping the underlying date the
same
+ utc := t.UTC()
+ naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(),
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+ // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit
value before
+ // this function. Otherwise, ignoring this error is not safe.
+ timestamp, _ := arrow.TimestampFromTime(utc, unit)
+ return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp,
int16, arrow.TimeUnit) {
+ structs := a.Storage().(*array.Struct)
+
+ timestampField := structs.Field(0)
+ timestamps := timestampField.(*array.Timestamp)
+
+ timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+ utcTimestamp := timestamps.Value(i)
+
+ var offsetMinutes int16
+
+ switch offsets := structs.Field(1).(type) {
+ case *array.Int16:
+ offsetMinutes = offsets.Value(i)
+ case *array.Dictionary:
+ offsetMinutes =
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+ case *array.RunEndEncoded:
+ offsetMinutes =
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+ }
+
+ return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+ if a.IsNull(i) {
+ return time.Unix(0, 0)
+ }
+ utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+ return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array and calls the callback with the timestamp at each
position. If it's null,
+// the timestamp will be nil.
+//
+// This will iterate using the fastest method given the underlying storage
array
+func (a* TimestampWithOffsetArray) iterValues(callback func(i int,
utcTimestamp *time.Time)) {
+ structs := a.Storage().(*array.Struct)
+ offsets := structs.Field(1)
+ if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee {
+ timestampField := structs.Field(0)
+ timeUnit :=
timestampField.DataType().(*arrow.TimestampType).Unit
+ timestamps := timestampField.(*array.Timestamp)
+
+ offsetValues := reeOffsets.Values().(*array.Int16)
+ offsetPhysicalIdx := 0
+
+ var getRunEnd (func(int) int)
+ switch arr := reeOffsets.RunEndsArr().(type) {
+ case *array.Int16:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ case *array.Int32:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ case *array.Int64:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ }
+
+ for i := 0; i < a.Len(); i++ {
+ if i >= getRunEnd(offsetPhysicalIdx) {
+ offsetPhysicalIdx += 1
+ }
+
+ timestamp := (*time.Time)(nil)
+ if a.IsValid(i) {
+ utcTimestamp := timestamps.Value(i)
+ offsetMinutes :=
offsetValues.Value(offsetPhysicalIdx)
+ v := timeFromFieldValues(utcTimestamp,
offsetMinutes, timeUnit)
+ timestamp = &v
+ }
+
+ callback(i, timestamp)
+ }
+ } else {
+ for i := 0; i < a.Len(); i++ {
+ timestamp := (*time.Time)(nil)
+ if a.IsValid(i) {
+ utcTimestamp, offsetMinutes, timeUnit :=
a.rawValueUnsafe(i)
+ v := timeFromFieldValues(utcTimestamp,
offsetMinutes, timeUnit)
+ timestamp = &v
+ }
+
+ callback(i, timestamp)
+ }
+ }
+}
+
+
+func (a *TimestampWithOffsetArray) Values() []time.Time {
+ values := make([]time.Time, a.Len())
+ a.iterValues(func(i int, timestamp *time.Time) {
+ if timestamp == nil {
+ values[i] = time.Unix(0, 0)
+ } else {
+ values[i] = *timestamp
+ }
+ })
+ return values
+}
+
+func (a *TimestampWithOffsetArray) ValueStr(i int) string {
+ switch {
+ case a.IsNull(i):
+ return array.NullValueStr
+ default:
+ return a.Value(i).String()
+ }
+}
+
+func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) {
+ values := make([]interface{}, a.Len())
+ a.iterValues(func(i int, timestamp *time.Time) {
+ values[i] = timestamp
+ })
+ return json.Marshal(values)
Review Comment:
Using `0` as a sentinel for `null` seems incorrect as that means you can't
actually encode a non-null value of 1970-01-01 and get the correct output.
Since we need to be able to represent the difference between a *value* of `0`
and a `null`, then either `iterValues()` needs to return a sequence of
`*time.Time` or it needs to be a sequence of `iter.Seq2[time.Time, bool]` where
the bool indicates validity (if the bool is false, then it's null).
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but
a Arrow IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would
always fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit,
arrow.DataType, bool) {
+ timeUnit := arrow.Second
+ offsetType := arrow.PrimitiveTypes.Int16
+ switch t := storageType.(type) {
+ case *arrow.StructType:
+ if t.NumFields() != 2 {
+ return timeUnit, offsetType, false
+ }
+
+ maybeTimestamp := t.Field(0)
+ maybeOffset := t.Field(1)
+
+ timestampOk := false
+ switch timestampType := maybeTimestamp.Type.(type) {
+ case *arrow.TimestampType:
+ if timestampType.TimeZone == "UTC" {
+ timestampOk = true
+ timeUnit = timestampType.TimeUnit()
+ }
+ default:
+ }
+
+ offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+ ok := maybeTimestamp.Name == "timestamp" &&
+ timestampOk &&
+ !maybeTimestamp.Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ offsetOk &&
+ !maybeOffset.Nullable
+
+ return timeUnit, maybeOffset.Type, ok
+ default:
+ return timeUnit, offsetType, false
+ }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit)
*TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.DictionaryType{
+ IndexType: index,
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+ if !offsetType.ValidRunEndsType(runEnds) {
+ return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s",
runEnds))
+ }
+
+ return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+ return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+ return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`,
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data
string) (arrow.ExtensionType, error) {
+ timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+ if !ok {
+ return nil, fmt.Errorf("invalid storage type for
TimestampWithOffsetType: %s", storageType.Name())
+ }
+
+ v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+ // SAFETY: the offsetType has already been checked by
isDataTypeCompatible, so we can ignore the error
+
+ return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType)
bool {
+ return b.ExtensionName() == other.ExtensionName()
+}
Review Comment:
I agree with this, we should update those builders to error / panic if
called and add docstring comments to explain that.
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but a Arrow
IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would always
fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit,
offsetType arrow.DataType, ok bool) {
+ unit = arrow.Second
+ offsetType = arrow.PrimitiveTypes.Int16
+ ok = false
+
+ st, compat := storageType.(*arrow.StructType)
+ if !compat || st.NumFields() != 2 {
+ return
+ }
+
+ if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat &&
ts.TimeZone == "UTC" {
+ unit = ts.TimeUnit()
+ } else {
+ return
+ }
+
+ maybeOffset := st.Field(1)
+ offsetType = maybeOffset.Type
+
+ ok = st.Field(0).Name == "timestamp" &&
+ !st.Field(0).Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ isOffsetTypeOk(offsetType) &&
+ !maybeOffset.Nullable
+ return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit,
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+type DictIndexType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+ offsetType := arrow.DictionaryType{
+ IndexType: arrow.DataType(index),
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+ // SAFETY: This should never error as DictIndexType is always a valid
index type
+
+ return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded[E
TimestampWithOffsetRunEndsType](unit arrow.TimeUnit, runEnds E)
*TimestampWithOffsetType {
+ offsetType := arrow.RunEndEncodedOf(arrow.DataType(runEnds),
arrow.PrimitiveTypes.Int16)
+
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, offsetType)
+ // SAFETY: This should never error as TimestampWithOffsetRunEndsType is
always a valid run ends type
+
+ return v
+
+}
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+ return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+ return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`,
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data
string) (arrow.ExtensionType, error) {
+ timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+ if !ok {
+ return nil, fmt.Errorf("invalid storage type for
TimestampWithOffsetType: %s", storageType.Name())
+ }
+
+ return NewTimestampWithOffsetTypeCustomOffset(timeUnit, offsetType)
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType)
bool {
+ return b.ExtensionName() == other.ExtensionName() &&
+ arrow.TypeEqual(b.StorageType(), other.StorageType())
+}
+
+func (b *TimestampWithOffsetType) OffsetType() arrow.DataType {
+ return b.ExtensionBase.Storage.(*arrow.StructType).Field(1).Type
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+ return
b.ExtensionBase.Storage.(*arrow.StructType).Field(0).Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator)
array.Builder {
+ v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), b.OffsetType())
+ return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+ array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+ var o strings.Builder
+ o.WriteString("[")
+ for i := 0; i < a.Len(); i++ {
+ if i > 0 {
+ o.WriteString(" ")
+ }
+ switch {
+ case a.IsNull(i):
+ o.WriteString(array.NullValueStr)
+ default:
+ fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+ }
+ }
+ o.WriteString("]")
+ return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16,
unit arrow.TimeUnit) time.Time {
+ hours := offsetMinutes / 60
+ minutes := offsetMinutes % 60
+ if minutes < 0 {
+ minutes = -minutes
+ }
+
+ loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes),
int(offsetMinutes)*60)
+ return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp,
int16) {
+ // naive "bitwise" conversion to UTC, keeping the underlying date the
same
+ utc := t.UTC()
+ naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(),
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+
+ // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit
value before
+ // this function. Otherwise, ignoring this error is not safe.
+ timestamp, _ := arrow.TimestampFromTime(utc, unit)
+ return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp,
int16, arrow.TimeUnit) {
+ structs := a.Storage().(*array.Struct)
+
+ timestampField := structs.Field(0)
+ timestamps := timestampField.(*array.Timestamp)
+
+ timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+ utcTimestamp := timestamps.Value(i)
+
+ var offsetMinutes int16
+
+ switch offsets := structs.Field(1).(type) {
+ case *array.Int16:
+ offsetMinutes = offsets.Value(i)
+ case *array.Dictionary:
+ offsetMinutes =
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+ case *array.RunEndEncoded:
+ offsetMinutes =
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+ }
+
+ return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+ if a.IsNull(i) {
+ return time.Unix(0, 0)
+ }
+ utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+ return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array returning the timestamp at each position.
+//
+// If the timestamp is null, the returned time will be the unix epoch.
Review Comment:
what happens if the value is not-null but is the unix epoch? this iteration
would be unable to represent that case
##########
arrow/extensions/timestamp_with_offset_test.go:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions_test
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/extensions"
+ "github.com/apache/arrow-go/v18/arrow/ipc"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+var testTimeUnit = arrow.Microsecond
+
+var testDate0 = time.Date(2025, 01, 01, 00, 00, 00, 00,
time.FixedZone("UTC+00:00", 0))
+
+var testZone1 = time.FixedZone("UTC-08:30", -8*60*60 -30*60)
+var testDate1 = testDate0.In(testZone1)
+
+var testZone2 = time.FixedZone("UTC+11:00", +11*60*60)
+var testDate2 = testDate0.In(testZone2)
+
+func dict(index arrow.DataType) arrow.DataType {
+ return &arrow.DictionaryType{
+ IndexType: index,
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+}
+
+func ree(runEnds arrow.DataType) arrow.DataType {
+ v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+ v.ValueNullable = false
+ return v
+}
+
+// All tests use this in a for loop to make sure everything works for every
possible
+// encoding of offsets (primitive, dictionary, run-end)
+var allAllowedOffsetTypes = []arrow.DataType{
+ // primitive offsetType
+ arrow.PrimitiveTypes.Int16,
+
+ // dict-encoded offsetType
+ dict(arrow.PrimitiveTypes.Uint8),
+ dict(arrow.PrimitiveTypes.Uint16),
+ dict(arrow.PrimitiveTypes.Uint32),
+ dict(arrow.PrimitiveTypes.Uint64),
+ dict(arrow.PrimitiveTypes.Int8),
+ dict(arrow.PrimitiveTypes.Int16),
+ dict(arrow.PrimitiveTypes.Int32),
+ dict(arrow.PrimitiveTypes.Int64),
+
+ // run-end encoded offsetType
+ ree(arrow.PrimitiveTypes.Int16),
+ ree(arrow.PrimitiveTypes.Int32),
+ ree(arrow.PrimitiveTypes.Int64),
+}
+
+func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) {
+ typ := extensions.NewTimestampWithOffsetType(testTimeUnit)
+
+ assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+ assert.True(t, typ.ExtensionEquals(typ))
+
+ assert.True(t, arrow.TypeEqual(typ, typ))
+ assert.True(t, arrow.TypeEqual(
+ arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: testTimeUnit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: arrow.PrimitiveTypes.Int16,
+ Nullable: false,
+ },
+ ),
+ typ.StorageType()))
+
+ assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func assertDictBasics[I extensions.DictIndexType](t *testing.T, indexType I) {
+ typ :=
extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, indexType)
+
+ assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+ assert.True(t, typ.ExtensionEquals(typ))
+
+ assert.True(t, arrow.TypeEqual(typ, typ))
+ assert.True(t, arrow.TypeEqual(
+ arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: testTimeUnit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: dict(arrow.DataType(indexType)),
+ Nullable: false,
+ },
+ ),
+ typ.StorageType()))
+
+ assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) {
+ assertDictBasics(t, &arrow.Uint8Type{})
+ assertDictBasics(t, &arrow.Uint16Type{})
+ assertDictBasics(t, &arrow.Uint32Type{})
+ assertDictBasics(t, &arrow.Uint64Type{})
+ assertDictBasics(t, &arrow.Int8Type{})
+ assertDictBasics(t, &arrow.Int16Type{})
+ assertDictBasics(t, &arrow.Int32Type{})
+ assertDictBasics(t, &arrow.Int64Type{})
+}
+
+func assertReeBasics[E extensions.TimestampWithOffsetRunEndsType](t
*testing.T, runEndsType E) {
+ typ := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit,
runEndsType)
+
+ assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+ assert.True(t, typ.ExtensionEquals(typ))
+
+ assert.True(t, arrow.TypeEqual(typ, typ))
+ assert.True(t, arrow.TypeEqual(
+ arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: testTimeUnit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: ree(arrow.DataType(runEndsType)),
+ Nullable: false,
+ },
+ ),
+ typ.StorageType()))
+
+ assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) {
+ assertReeBasics(t, &arrow.Int16Type{})
+ assertReeBasics(t, &arrow.Int32Type{})
+ assertReeBasics(t, &arrow.Int64Type{})
+}
+
+func TestTimestampWithOffsetEquals(t *testing.T) {
+ // Completely different types are not equal
+ assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewBool8Type()))
+
+ // Different time units are not equal
+ // assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Microsecond)))
+ // assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Second)))
+ // assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Second)))
+ //
+ // // Different underlying storage type is not equal
+ // assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetTypeDictionaryEncoded(arrow.Microsecond,
&arrow.Int16Type{})))
+ // assert.False(t,
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetTypeRunEndEncoded(arrow.Microsecond,
&arrow.Int16Type{})))
+ // assert.False(t,
extensions.NewTimestampWithOffsetTypeDictionaryEncoded(arrow.Microsecond,
&arrow.Int16Type{}).ExtensionEquals(extensions.NewTimestampWithOffsetTypeRunEndEncoded(arrow.Microsecond,
&arrow.Int16Type{})))
+ //
+ // // Dict-encoding key type is not equal
Review Comment:
why are these all commented out?
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "math"
+ "reflect"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but
a Arrow IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would
always fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit,
arrow.DataType, bool) {
+ timeUnit := arrow.Second
+ offsetType := arrow.PrimitiveTypes.Int16
+ switch t := storageType.(type) {
+ case *arrow.StructType:
+ if t.NumFields() != 2 {
+ return timeUnit, offsetType, false
+ }
+
+ maybeTimestamp := t.Field(0)
+ maybeOffset := t.Field(1)
+
+ timestampOk := false
+ switch timestampType := maybeTimestamp.Type.(type) {
+ case *arrow.TimestampType:
+ if timestampType.TimeZone == "UTC" {
+ timestampOk = true
+ timeUnit = timestampType.TimeUnit()
+ }
+ default:
+ }
+
+ offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+ ok := maybeTimestamp.Name == "timestamp" &&
+ timestampOk &&
+ !maybeTimestamp.Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ offsetOk &&
+ !maybeOffset.Nullable
+
+ return timeUnit, maybeOffset.Type, ok
+ default:
+ return timeUnit, offsetType, false
+ }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit)
*TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.DictionaryType{
+ IndexType: index,
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds
arrow.DataType) (*TimestampWithOffsetType, error) {
+ offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+ if !offsetType.ValidRunEndsType(runEnds) {
+ return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s",
runEnds))
+ }
+
+ return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+ return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+ return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`,
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data
string) (arrow.ExtensionType, error) {
+ timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+ if !ok {
+ return nil, fmt.Errorf("invalid storage type for
TimestampWithOffsetType: %s", storageType.Name())
+ }
+
+ v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+ // SAFETY: the offsetType has already been checked by
isDataTypeCompatible, so we can ignore the error
+
+ return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType)
bool {
+ return b.ExtensionName() == other.ExtensionName()
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+ return
b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator)
array.Builder {
+ v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(),
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This will never error as Int16 is always a valid type for
the offset field
+
+ return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+ array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+ var o strings.Builder
+ o.WriteString("[")
+ for i := 0; i < a.Len(); i++ {
+ if i > 0 {
+ o.WriteString(" ")
+ }
+ switch {
+ case a.IsNull(i):
+ o.WriteString(array.NullValueStr)
+ default:
+ fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+ }
+ }
+ o.WriteString("]")
+ return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16,
unit arrow.TimeUnit) time.Time {
+ hours := offsetMinutes / 60
+ minutes := offsetMinutes % 60
+ if minutes < 0 {
+ minutes = -minutes
+ }
+
+ loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes),
int(offsetMinutes)*60)
+ return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp,
int16) {
+ // naive "bitwise" conversion to UTC, keeping the underlying date the
same
+ utc := t.UTC()
+ naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(),
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
Review Comment:
can you give me an example where it messes up the values?
Just to verify, the goal here is to take `t` and provide the same *instant
of time* in UTC while figuring out what the offset was in minutes for the
timezone, yes? i.e. given `2009-11-10 15:00:00 -0800 PST` this should return
`2009-11-10 23:00:00 +0000 UTC` and `480`, correct?
##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+ "errors"
+ "fmt"
+ "iter"
+ "math"
+ "reflect"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+ arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+ switch offsetType := offsetType.(type) {
+ case *arrow.Int16Type:
+ return true
+ case *arrow.DictionaryType:
+ return arrow.IsInteger(offsetType.IndexType.ID()) &&
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+ case *arrow.RunEndEncodedType:
+ return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+ arrow.TypeEqual(offsetType.Encoded(),
arrow.PrimitiveTypes.Int16)
+ // FIXME: Technically this should be non-nullable, but a Arrow
IPC does not deserialize
+ // ValueNullable properly, so enforcing this here would always
fail when reading from an IPC
+ // stream
+ // !offsetType.ValueNullable
+ default:
+ return false
+ }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit,
offsetType arrow.DataType, ok bool) {
+ unit = arrow.Second
+ offsetType = arrow.PrimitiveTypes.Int16
+ ok = false
+
+ st, compat := storageType.(*arrow.StructType)
+ if !compat || st.NumFields() != 2 {
+ return
+ }
+
+ if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat &&
ts.TimeZone == "UTC" {
+ unit = ts.TimeUnit()
+ } else {
+ return
+ }
+
+ maybeOffset := st.Field(1)
+ offsetType = maybeOffset.Type
+
+ ok = st.Field(0).Name == "timestamp" &&
+ !st.Field(0).Nullable &&
+ maybeOffset.Name == "offset_minutes" &&
+ isOffsetTypeOk(offsetType) &&
+ !maybeOffset.Nullable
+ return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit,
arrow.PrimitiveTypes.Int16)
+ // SAFETY: This should never error as Int16 is always a valid offset
type
+
+ return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType
arrow.DataType) (*TimestampWithOffsetType, error) {
+ if !isOffsetTypeOk(offsetType) {
+ return nil, errors.New(fmt.Sprintf("Invalid offset type %s",
offsetType))
+ }
+
+ return &TimestampWithOffsetType{
+ ExtensionBase: arrow.ExtensionBase{
+ Storage: arrow.StructOf(
+ arrow.Field{
+ Name: "timestamp",
+ Type: &arrow.TimestampType{
+ Unit: unit,
+ TimeZone: "UTC",
+ },
+ Nullable: false,
+ },
+ arrow.Field{
+ Name: "offset_minutes",
+ Type: offsetType,
+ Nullable: false,
+ },
+ ),
+ },
+ }, nil
+}
+
+type DictIndexType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)),
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+ offsetType := arrow.DictionaryType{
+ IndexType: arrow.DataType(index),
+ ValueType: arrow.PrimitiveTypes.Int16,
+ Ordered: false,
+ }
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+ // SAFETY: This should never error as DictIndexType is always a valid
index type
+
+ return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+ *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type |
*arrow.Int64Type |
+ *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type |
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E,
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded[E
TimestampWithOffsetRunEndsType](unit arrow.TimeUnit, runEnds E)
*TimestampWithOffsetType {
+ offsetType := arrow.RunEndEncodedOf(arrow.DataType(runEnds),
arrow.PrimitiveTypes.Int16)
+
+ v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, offsetType)
+ // SAFETY: This should never error as TimestampWithOffsetRunEndsType is
always a valid run ends type
+
+ return v
+
+}
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+ return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+ return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`,
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data
string) (arrow.ExtensionType, error) {
+ timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+ if !ok {
+ return nil, fmt.Errorf("invalid storage type for
TimestampWithOffsetType: %s", storageType.Name())
+ }
+
+ return NewTimestampWithOffsetTypeCustomOffset(timeUnit, offsetType)
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType)
bool {
+ return b.ExtensionName() == other.ExtensionName() &&
+ arrow.TypeEqual(b.StorageType(), other.StorageType())
+}
+
+func (b *TimestampWithOffsetType) OffsetType() arrow.DataType {
+ return b.ExtensionBase.Storage.(*arrow.StructType).Field(1).Type
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+ return
b.ExtensionBase.Storage.(*arrow.StructType).Field(0).Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator)
array.Builder {
+ v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), b.OffsetType())
+ return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+ array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+ var o strings.Builder
+ o.WriteString("[")
+ for i := 0; i < a.Len(); i++ {
+ if i > 0 {
+ o.WriteString(" ")
+ }
+ switch {
+ case a.IsNull(i):
+ o.WriteString(array.NullValueStr)
+ default:
+ fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+ }
+ }
+ o.WriteString("]")
+ return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16,
unit arrow.TimeUnit) time.Time {
+ hours := offsetMinutes / 60
+ minutes := offsetMinutes % 60
+ if minutes < 0 {
+ minutes = -minutes
+ }
+
+ loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes),
int(offsetMinutes)*60)
+ return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp,
int16) {
+ // naive "bitwise" conversion to UTC, keeping the underlying date the
same
+ utc := t.UTC()
+ naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(),
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+ offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+
+ // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit
value before
+ // this function. Otherwise, ignoring this error is not safe.
+ timestamp, _ := arrow.TimestampFromTime(utc, unit)
+ return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp,
int16, arrow.TimeUnit) {
+ structs := a.Storage().(*array.Struct)
+
+ timestampField := structs.Field(0)
+ timestamps := timestampField.(*array.Timestamp)
+
+ timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+ utcTimestamp := timestamps.Value(i)
+
+ var offsetMinutes int16
+
+ switch offsets := structs.Field(1).(type) {
+ case *array.Int16:
+ offsetMinutes = offsets.Value(i)
+ case *array.Dictionary:
+ offsetMinutes =
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+ case *array.RunEndEncoded:
+ offsetMinutes =
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+ }
+
+ return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+ if a.IsNull(i) {
+ return time.Unix(0, 0)
+ }
+ utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+ return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array returning the timestamp at each position.
+//
+// If the timestamp is null, the returned time will be the unix epoch.
+//
+// This will iterate using the fastest method given the underlying storage
array
+func (a *TimestampWithOffsetArray) iterValues() iter.Seq[time.Time] {
+ return func(yield func(time.Time) bool) {
+ structs := a.Storage().(*array.Struct)
+ offsets := structs.Field(1)
+ if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee {
+ timestampField := structs.Field(0)
+ timeUnit :=
timestampField.DataType().(*arrow.TimestampType).Unit
+ timestamps := timestampField.(*array.Timestamp)
+
+ offsetValues := reeOffsets.Values().(*array.Int16)
+ offsetPhysicalIdx := 0
+
+ var getRunEnd (func(int) int)
+ switch arr := reeOffsets.RunEndsArr().(type) {
+ case *array.Int16:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ case *array.Int32:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ case *array.Int64:
+ getRunEnd = func(idx int) int { return
int(arr.Value(idx)) }
+ }
+
+ for i := 0; i < a.Len(); i++ {
+ if i >= getRunEnd(offsetPhysicalIdx) {
+ offsetPhysicalIdx += 1
+ }
+
+ ts := time.Unix(0, 0)
+ if a.IsValid(i) {
+ utcTimestamp := timestamps.Value(i)
+ offsetMinutes :=
offsetValues.Value(offsetPhysicalIdx)
+ v := timeFromFieldValues(utcTimestamp,
offsetMinutes, timeUnit)
+ ts = v
+ }
+
+ if !yield(ts) {
+ return
+ }
+ }
+ } else {
+ for i := 0; i < a.Len(); i++ {
+ ts := time.Unix(0, 0)
Review Comment:
this should be equivalent to just doing `var ts time.Time`, except faster
since it avoids the extra processing of calling the `Unix` function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]