zeroshade commented on code in PR #558:
URL: https://github.com/apache/arrow-go/pull/558#discussion_r2771557250


##########
arrow/array/encoded.go:
##########
@@ -386,6 +386,15 @@ func (b *RunEndEncodedBuilder) ContinueRun(n uint64) {
        b.addLength(n)
 }
 
+func (b *RunEndEncodedBuilder) UnsafeAppendBoolToBitmap(v bool) {
+       if !v {
+               b.finishRun()
+       }

Review Comment:
   this seems incorrect, it could be a run of nulls, couldn't it? And wouldn't 
the run itself be handled by the value append? I don't think we need to call 
`b.finishRun` here



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)

Review Comment:
   `arrow.IsInteger` is unnecessary here, by definition only integer types are 
allowed for the IndexTypes of a dictionary type. Anything else is invalid 
anyways, so there's no need to check that.



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+               // FIXME: Technically this should be non-nullable, but a Arrow 
IPC does not deserialize
+               // ValueNullable properly, so enforcing this here would always 
fail when reading from an IPC
+               // stream
+               // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit, 
offsetType arrow.DataType, ok bool) {
+       unit = arrow.Second
+       offsetType = arrow.PrimitiveTypes.Int16
+       ok = false
+
+       st, compat := storageType.(*arrow.StructType)
+       if !compat || st.NumFields() != 2 {
+               return
+       }
+
+       if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat && 
ts.TimeZone == "UTC" {
+               unit = ts.TimeUnit()
+       } else {
+               return
+       }
+
+       maybeOffset := st.Field(1)
+       offsetType = maybeOffset.Type
+
+       ok = st.Field(0).Name == "timestamp" &&
+               !st.Field(0).Nullable &&
+               maybeOffset.Name == "offset_minutes" &&
+               isOffsetTypeOk(offsetType) &&
+               !maybeOffset.Nullable
+       return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+type DictIndexType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit 
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+       offsetType := arrow.DictionaryType{
+               IndexType: arrow.DataType(index),
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+       // SAFETY: This should never error as DictIndexType is always a valid 
index type
+
+       return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.

Review Comment:
   Update the comment to remove this since the generics will affirm that we 
can't have this error



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+               // FIXME: Technically this should be non-nullable, but a Arrow 
IPC does not deserialize
+               // ValueNullable properly, so enforcing this here would always 
fail when reading from an IPC
+               // stream
+               // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit, 
offsetType arrow.DataType, ok bool) {
+       unit = arrow.Second
+       offsetType = arrow.PrimitiveTypes.Int16
+       ok = false
+
+       st, compat := storageType.(*arrow.StructType)
+       if !compat || st.NumFields() != 2 {
+               return
+       }
+
+       if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat && 
ts.TimeZone == "UTC" {
+               unit = ts.TimeUnit()
+       } else {
+               return
+       }
+
+       maybeOffset := st.Field(1)
+       offsetType = maybeOffset.Type
+
+       ok = st.Field(0).Name == "timestamp" &&
+               !st.Field(0).Nullable &&
+               maybeOffset.Name == "offset_minutes" &&
+               isOffsetTypeOk(offsetType) &&
+               !maybeOffset.Nullable
+       return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+type DictIndexType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit 
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+       offsetType := arrow.DictionaryType{
+               IndexType: arrow.DataType(index),
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+       // SAFETY: This should never error as DictIndexType is always a valid 
index type
+
+       return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type

Review Comment:
   this and the `DictIndexType` are both identical, instead of duplicating this 
we should either create a single type constraint or embed one in the other:
   
   ```go
   type TimestampWithOffsetRunEndsType interface {
        DictIndexType
   }
   ```
   
   My personal preference would be to have a single type though, but I'm not 
averse to having the two separate ones if necessary.



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+               // FIXME: Technically this should be non-nullable, but a Arrow 
IPC does not deserialize
+               // ValueNullable properly, so enforcing this here would always 
fail when reading from an IPC
+               // stream
+               // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit, 
offsetType arrow.DataType, ok bool) {
+       unit = arrow.Second
+       offsetType = arrow.PrimitiveTypes.Int16
+       ok = false
+
+       st, compat := storageType.(*arrow.StructType)
+       if !compat || st.NumFields() != 2 {
+               return
+       }
+
+       if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat && 
ts.TimeZone == "UTC" {
+               unit = ts.TimeUnit()
+       } else {
+               return
+       }
+
+       maybeOffset := st.Field(1)
+       offsetType = maybeOffset.Type
+
+       ok = st.Field(0).Name == "timestamp" &&
+               !st.Field(0).Nullable &&
+               maybeOffset.Name == "offset_minutes" &&
+               isOffsetTypeOk(offsetType) &&
+               !maybeOffset.Nullable
+       return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+type DictIndexType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.

Review Comment:
   remove this line from the comment since the generics prevent this from 
happening



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "math"
+       "reflect"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) && 
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+                       // FIXME: Technically this should be non-nullable, but 
a Arrow IPC does not deserialize
+                       // ValueNullable properly, so enforcing this here would 
always fail when reading from an IPC
+                       // stream
+                       // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, 
arrow.DataType, bool) {
+       timeUnit := arrow.Second
+       offsetType := arrow.PrimitiveTypes.Int16
+       switch t := storageType.(type) {
+       case *arrow.StructType:
+               if t.NumFields() != 2 {
+                       return timeUnit, offsetType, false
+               }
+
+               maybeTimestamp := t.Field(0)
+               maybeOffset := t.Field(1)
+
+               timestampOk := false
+               switch timestampType := maybeTimestamp.Type.(type) {
+               case *arrow.TimestampType:
+                       if timestampType.TimeZone == "UTC" {
+                               timestampOk = true
+                               timeUnit = timestampType.TimeUnit()
+                       }
+               default:
+               }
+
+               offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+               ok := maybeTimestamp.Name == "timestamp" &&
+                       timestampOk &&
+                       !maybeTimestamp.Nullable &&
+                       maybeOffset.Name == "offset_minutes" &&
+                       offsetOk &&
+                       !maybeOffset.Nullable
+
+               return timeUnit, maybeOffset.Type, ok
+       default:
+               return timeUnit, offsetType, false
+       }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new 
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit) 
*TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.DictionaryType{
+               IndexType: index,
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+       if !offsetType.ValidRunEndsType(runEnds) {
+               return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", 
runEnds))
+       }
+
+       return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+       return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return 
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+       return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+       return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, 
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data 
string) (arrow.ExtensionType, error) {
+       timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+       if !ok {
+               return nil, fmt.Errorf("invalid storage type for 
TimestampWithOffsetType: %s", storageType.Name())
+       }
+
+       v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+       // SAFETY: the offsetType has already been checked by 
isDataTypeCompatible, so we can ignore the error
+
+       return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) 
bool {
+       return b.ExtensionName() == other.ExtensionName()
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+       return 
b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) 
array.Builder {
+       v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This will never error as Int16 is always a valid type for 
the offset field
+
+       return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+       array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+       var o strings.Builder
+       o.WriteString("[")
+       for i := 0; i < a.Len(); i++ {
+               if i > 0 {
+                       o.WriteString(" ")
+               }
+               switch {
+               case a.IsNull(i):
+                       o.WriteString(array.NullValueStr)
+               default:
+                       fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+               }
+       }
+       o.WriteString("]")
+       return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, 
unit arrow.TimeUnit) time.Time {
+       hours := offsetMinutes / 60
+       minutes := offsetMinutes % 60
+       if minutes < 0 {
+               minutes = -minutes
+       }
+
+       loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), 
int(offsetMinutes)*60)
+       return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, 
int16) {
+       // naive "bitwise" conversion to UTC, keeping the underlying date the 
same
+       utc := t.UTC()
+       naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+       offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+       // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit 
value before
+       // this function. Otherwise, ignoring this error is not safe.
+       timestamp, _ := arrow.TimestampFromTime(utc, unit)
+       return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, 
int16, arrow.TimeUnit) {
+       structs := a.Storage().(*array.Struct)
+
+       timestampField := structs.Field(0)
+       timestamps := timestampField.(*array.Timestamp)
+
+       timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+       utcTimestamp := timestamps.Value(i)
+
+       var offsetMinutes int16
+
+       switch offsets := structs.Field(1).(type) {
+       case *array.Int16:
+               offsetMinutes = offsets.Value(i)
+       case *array.Dictionary:
+               offsetMinutes = 
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+       case *array.RunEndEncoded:
+               offsetMinutes = 
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+       }
+
+       return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+       if a.IsNull(i) {
+               return time.Unix(0, 0)
+       }
+       utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+       return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array and calls the callback with the timestamp at each 
position. If it's null,
+// the timestamp will be nil.
+//
+// This will iterate using the fastest method given the underlying storage 
array
+func (a* TimestampWithOffsetArray) iterValues(callback func(i int, 
utcTimestamp *time.Time)) {
+       structs := a.Storage().(*array.Struct)
+       offsets := structs.Field(1)
+       if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee {
+               timestampField := structs.Field(0)
+               timeUnit := 
timestampField.DataType().(*arrow.TimestampType).Unit
+               timestamps := timestampField.(*array.Timestamp)
+
+               offsetValues := reeOffsets.Values().(*array.Int16)
+               offsetPhysicalIdx := 0
+
+               var getRunEnd (func(int) int)
+               switch arr := reeOffsets.RunEndsArr().(type) {
+               case *array.Int16:
+                       getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+               case *array.Int32:
+                       getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+               case *array.Int64:
+                       getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+               }
+
+               for i := 0; i < a.Len(); i++ {
+                       if i >= getRunEnd(offsetPhysicalIdx) {
+                               offsetPhysicalIdx += 1
+                       }
+
+                       timestamp := (*time.Time)(nil)
+                       if a.IsValid(i) {
+                               utcTimestamp := timestamps.Value(i)
+                               offsetMinutes := 
offsetValues.Value(offsetPhysicalIdx)
+                               v := timeFromFieldValues(utcTimestamp, 
offsetMinutes, timeUnit)
+                               timestamp = &v
+                       } 
+
+                       callback(i, timestamp)
+               }
+       } else {
+               for i := 0; i < a.Len(); i++ {
+                       timestamp := (*time.Time)(nil)
+                       if a.IsValid(i) {
+                               utcTimestamp, offsetMinutes, timeUnit := 
a.rawValueUnsafe(i)
+                               v := timeFromFieldValues(utcTimestamp, 
offsetMinutes, timeUnit)
+                               timestamp = &v
+                       } 
+
+                       callback(i, timestamp)
+               }
+       }
+}
+
+
+func (a *TimestampWithOffsetArray) Values() []time.Time {
+       values := make([]time.Time, a.Len())
+       a.iterValues(func(i int, timestamp *time.Time) {
+               if timestamp == nil {
+                       values[i] = time.Unix(0, 0)
+               } else {
+                       values[i] = *timestamp
+               }
+       })
+       return values
+}
+
+func (a *TimestampWithOffsetArray) ValueStr(i int) string {
+       switch {
+       case a.IsNull(i):
+               return array.NullValueStr
+       default:
+               return a.Value(i).String()
+       }
+}
+
+func (a *TimestampWithOffsetArray) MarshalJSON() ([]byte, error) {
+       values := make([]interface{}, a.Len())
+       a.iterValues(func(i int, timestamp *time.Time) {
+               values[i] = timestamp
+       })
+       return json.Marshal(values)

Review Comment:
   Using `0` as a sentinel for `null` seems incorrect as that means you can't 
actually encode a non-null value of 1970-01-01 and get the correct output. 
Since we need to be able to represent the difference between a *value* of `0` 
and a `null`, then either `iterValues()` needs to return a sequence of 
`*time.Time` or it needs to be a sequence of `iter.Seq2[time.Time, bool]` where 
the bool indicates validity (if the bool is false, then it's null).



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "math"
+       "reflect"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) && 
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+                       // FIXME: Technically this should be non-nullable, but 
a Arrow IPC does not deserialize
+                       // ValueNullable properly, so enforcing this here would 
always fail when reading from an IPC
+                       // stream
+                       // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, 
arrow.DataType, bool) {
+       timeUnit := arrow.Second
+       offsetType := arrow.PrimitiveTypes.Int16
+       switch t := storageType.(type) {
+       case *arrow.StructType:
+               if t.NumFields() != 2 {
+                       return timeUnit, offsetType, false
+               }
+
+               maybeTimestamp := t.Field(0)
+               maybeOffset := t.Field(1)
+
+               timestampOk := false
+               switch timestampType := maybeTimestamp.Type.(type) {
+               case *arrow.TimestampType:
+                       if timestampType.TimeZone == "UTC" {
+                               timestampOk = true
+                               timeUnit = timestampType.TimeUnit()
+                       }
+               default:
+               }
+
+               offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+               ok := maybeTimestamp.Name == "timestamp" &&
+                       timestampOk &&
+                       !maybeTimestamp.Nullable &&
+                       maybeOffset.Name == "offset_minutes" &&
+                       offsetOk &&
+                       !maybeOffset.Nullable
+
+               return timeUnit, maybeOffset.Type, ok
+       default:
+               return timeUnit, offsetType, false
+       }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new 
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit) 
*TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.DictionaryType{
+               IndexType: index,
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+       if !offsetType.ValidRunEndsType(runEnds) {
+               return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", 
runEnds))
+       }
+
+       return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+       return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return 
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+       return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+       return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, 
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data 
string) (arrow.ExtensionType, error) {
+       timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+       if !ok {
+               return nil, fmt.Errorf("invalid storage type for 
TimestampWithOffsetType: %s", storageType.Name())
+       }
+
+       v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+       // SAFETY: the offsetType has already been checked by 
isDataTypeCompatible, so we can ignore the error
+
+       return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) 
bool {
+       return b.ExtensionName() == other.ExtensionName()
+}

Review Comment:
   I agree with this, we should update those builders to error / panic if 
called and add docstring comments to explain that.



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+               // FIXME: Technically this should be non-nullable, but a Arrow 
IPC does not deserialize
+               // ValueNullable properly, so enforcing this here would always 
fail when reading from an IPC
+               // stream
+               // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit, 
offsetType arrow.DataType, ok bool) {
+       unit = arrow.Second
+       offsetType = arrow.PrimitiveTypes.Int16
+       ok = false
+
+       st, compat := storageType.(*arrow.StructType)
+       if !compat || st.NumFields() != 2 {
+               return
+       }
+
+       if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat && 
ts.TimeZone == "UTC" {
+               unit = ts.TimeUnit()
+       } else {
+               return
+       }
+
+       maybeOffset := st.Field(1)
+       offsetType = maybeOffset.Type
+
+       ok = st.Field(0).Name == "timestamp" &&
+               !st.Field(0).Nullable &&
+               maybeOffset.Name == "offset_minutes" &&
+               isOffsetTypeOk(offsetType) &&
+               !maybeOffset.Nullable
+       return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+type DictIndexType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit 
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+       offsetType := arrow.DictionaryType{
+               IndexType: arrow.DataType(index),
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+       // SAFETY: This should never error as DictIndexType is always a valid 
index type
+
+       return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded[E 
TimestampWithOffsetRunEndsType](unit arrow.TimeUnit, runEnds E) 
*TimestampWithOffsetType {
+       offsetType := arrow.RunEndEncodedOf(arrow.DataType(runEnds), 
arrow.PrimitiveTypes.Int16)
+
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, offsetType)
+       // SAFETY: This should never error as TimestampWithOffsetRunEndsType is 
always a valid run ends type
+
+       return v
+
+}
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+       return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return 
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+       return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+       return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, 
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data 
string) (arrow.ExtensionType, error) {
+       timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+       if !ok {
+               return nil, fmt.Errorf("invalid storage type for 
TimestampWithOffsetType: %s", storageType.Name())
+       }
+
+       return NewTimestampWithOffsetTypeCustomOffset(timeUnit, offsetType)
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) 
bool {
+       return b.ExtensionName() == other.ExtensionName() &&
+               arrow.TypeEqual(b.StorageType(), other.StorageType())
+}
+
+func (b *TimestampWithOffsetType) OffsetType() arrow.DataType {
+       return b.ExtensionBase.Storage.(*arrow.StructType).Field(1).Type
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+       return 
b.ExtensionBase.Storage.(*arrow.StructType).Field(0).Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) 
array.Builder {
+       v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), b.OffsetType())
+       return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+       array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+       var o strings.Builder
+       o.WriteString("[")
+       for i := 0; i < a.Len(); i++ {
+               if i > 0 {
+                       o.WriteString(" ")
+               }
+               switch {
+               case a.IsNull(i):
+                       o.WriteString(array.NullValueStr)
+               default:
+                       fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+               }
+       }
+       o.WriteString("]")
+       return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, 
unit arrow.TimeUnit) time.Time {
+       hours := offsetMinutes / 60
+       minutes := offsetMinutes % 60
+       if minutes < 0 {
+               minutes = -minutes
+       }
+
+       loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), 
int(offsetMinutes)*60)
+       return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, 
int16) {
+       // naive "bitwise" conversion to UTC, keeping the underlying date the 
same
+       utc := t.UTC()
+       naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+       offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+
+       // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit 
value before
+       // this function. Otherwise, ignoring this error is not safe.
+       timestamp, _ := arrow.TimestampFromTime(utc, unit)
+       return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, 
int16, arrow.TimeUnit) {
+       structs := a.Storage().(*array.Struct)
+
+       timestampField := structs.Field(0)
+       timestamps := timestampField.(*array.Timestamp)
+
+       timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+       utcTimestamp := timestamps.Value(i)
+
+       var offsetMinutes int16
+
+       switch offsets := structs.Field(1).(type) {
+       case *array.Int16:
+               offsetMinutes = offsets.Value(i)
+       case *array.Dictionary:
+               offsetMinutes = 
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+       case *array.RunEndEncoded:
+               offsetMinutes = 
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+       }
+
+       return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+       if a.IsNull(i) {
+               return time.Unix(0, 0)
+       }
+       utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+       return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array returning the timestamp at each position.
+//
+// If the timestamp is null, the returned time will be the unix epoch.

Review Comment:
   what happens if the value is not-null but is the unix epoch? this iteration 
would be unable to represent that case



##########
arrow/extensions/timestamp_with_offset_test.go:
##########
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions_test
+
+import (
+       "bytes"
+       "fmt"
+       "testing"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/extensions"
+       "github.com/apache/arrow-go/v18/arrow/ipc"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+var testTimeUnit = arrow.Microsecond
+
+var testDate0 = time.Date(2025, 01, 01, 00, 00, 00, 00, 
time.FixedZone("UTC+00:00", 0))
+
+var testZone1 = time.FixedZone("UTC-08:30", -8*60*60 -30*60)
+var testDate1 = testDate0.In(testZone1)
+
+var testZone2 = time.FixedZone("UTC+11:00", +11*60*60)
+var testDate2 = testDate0.In(testZone2)
+
+func dict(index arrow.DataType) arrow.DataType {
+       return &arrow.DictionaryType{
+               IndexType: index,
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered: false,
+       }
+}
+
+func ree(runEnds arrow.DataType) arrow.DataType {
+       v := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+       v.ValueNullable = false
+       return v
+}
+
+// All tests use this in a for loop to make sure everything works for every 
possible
+// encoding of offsets (primitive, dictionary, run-end)
+var allAllowedOffsetTypes = []arrow.DataType{
+       // primitive offsetType
+       arrow.PrimitiveTypes.Int16,
+
+       // dict-encoded offsetType
+       dict(arrow.PrimitiveTypes.Uint8),
+       dict(arrow.PrimitiveTypes.Uint16),
+       dict(arrow.PrimitiveTypes.Uint32),
+       dict(arrow.PrimitiveTypes.Uint64),
+       dict(arrow.PrimitiveTypes.Int8),
+       dict(arrow.PrimitiveTypes.Int16),
+       dict(arrow.PrimitiveTypes.Int32),
+       dict(arrow.PrimitiveTypes.Int64),
+
+       // run-end encoded offsetType
+       ree(arrow.PrimitiveTypes.Int16),
+       ree(arrow.PrimitiveTypes.Int32),
+       ree(arrow.PrimitiveTypes.Int64),
+}
+
+func TestTimestampWithOffsetTypePrimitiveBasics(t *testing.T) {
+       typ := extensions.NewTimestampWithOffsetType(testTimeUnit)
+
+       assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+       assert.True(t, typ.ExtensionEquals(typ))
+
+       assert.True(t, arrow.TypeEqual(typ, typ))
+       assert.True(t, arrow.TypeEqual(
+               arrow.StructOf(
+                       arrow.Field{
+                               Name: "timestamp",
+                               Type: &arrow.TimestampType{
+                                       Unit:     testTimeUnit,
+                                       TimeZone: "UTC",
+                               },
+                               Nullable: false,
+                       },
+                       arrow.Field{
+                               Name:     "offset_minutes",
+                               Type:     arrow.PrimitiveTypes.Int16,
+                               Nullable: false,
+                       },
+               ),
+               typ.StorageType()))
+
+       assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func assertDictBasics[I extensions.DictIndexType](t *testing.T, indexType I) {
+       typ := 
extensions.NewTimestampWithOffsetTypeDictionaryEncoded(testTimeUnit, indexType)
+
+       assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+       assert.True(t, typ.ExtensionEquals(typ))
+
+       assert.True(t, arrow.TypeEqual(typ, typ))
+       assert.True(t, arrow.TypeEqual(
+               arrow.StructOf(
+                       arrow.Field{
+                               Name: "timestamp",
+                               Type: &arrow.TimestampType{
+                                       Unit:     testTimeUnit,
+                                       TimeZone: "UTC",
+                               },
+                               Nullable: false,
+                       },
+                       arrow.Field{
+                               Name: "offset_minutes",
+                               Type: dict(arrow.DataType(indexType)),
+                               Nullable: false,
+                       },
+               ),
+               typ.StorageType()))
+
+       assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func TestTimestampWithOffsetTypeDictionaryEncodedBasics(t *testing.T) {
+       assertDictBasics(t, &arrow.Uint8Type{})
+       assertDictBasics(t, &arrow.Uint16Type{})
+       assertDictBasics(t, &arrow.Uint32Type{})
+       assertDictBasics(t, &arrow.Uint64Type{})
+       assertDictBasics(t, &arrow.Int8Type{})
+       assertDictBasics(t, &arrow.Int16Type{})
+       assertDictBasics(t, &arrow.Int32Type{})
+       assertDictBasics(t, &arrow.Int64Type{})
+}
+
+func assertReeBasics[E extensions.TimestampWithOffsetRunEndsType](t 
*testing.T, runEndsType E) {
+       typ := extensions.NewTimestampWithOffsetTypeRunEndEncoded(testTimeUnit, 
runEndsType)
+
+       assert.Equal(t, "arrow.timestamp_with_offset", typ.ExtensionName())
+       assert.True(t, typ.ExtensionEquals(typ))
+
+       assert.True(t, arrow.TypeEqual(typ, typ))
+       assert.True(t, arrow.TypeEqual(
+               arrow.StructOf(
+                       arrow.Field{
+                               Name: "timestamp",
+                               Type: &arrow.TimestampType{
+                                       Unit:     testTimeUnit,
+                                       TimeZone: "UTC",
+                               },
+                               Nullable: false,
+                       },
+                       arrow.Field{
+                               Name: "offset_minutes",
+                               Type: ree(arrow.DataType(runEndsType)),
+                               Nullable: false,
+                       },
+               ),
+               typ.StorageType()))
+
+       assert.Equal(t, "extension<arrow.timestamp_with_offset>", typ.String())
+}
+
+func TestTimestampWithOffsetTypeRunEndEncodedBasics(t *testing.T) {
+       assertReeBasics(t, &arrow.Int16Type{})
+       assertReeBasics(t, &arrow.Int32Type{})
+       assertReeBasics(t, &arrow.Int64Type{})
+}
+
+func TestTimestampWithOffsetEquals(t *testing.T) {
+       // Completely different types are not equal
+       assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewBool8Type()))
+
+       // Different time units are not equal
+       // assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Microsecond)))
+       // assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Nanosecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Second)))
+       // assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetType(arrow.Second)))
+       //
+       // // Different underlying storage type is not equal
+       // assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetTypeDictionaryEncoded(arrow.Microsecond,
 &arrow.Int16Type{})))
+       // assert.False(t, 
extensions.NewTimestampWithOffsetType(arrow.Microsecond).ExtensionEquals(extensions.NewTimestampWithOffsetTypeRunEndEncoded(arrow.Microsecond,
 &arrow.Int16Type{})))
+       // assert.False(t, 
extensions.NewTimestampWithOffsetTypeDictionaryEncoded(arrow.Microsecond, 
&arrow.Int16Type{}).ExtensionEquals(extensions.NewTimestampWithOffsetTypeRunEndEncoded(arrow.Microsecond,
 &arrow.Int16Type{})))
+       //
+       // // Dict-encoding key type is not equal

Review Comment:
   why are these all commented out?



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,568 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "math"
+       "reflect"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) && 
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+                       // FIXME: Technically this should be non-nullable, but 
a Arrow IPC does not deserialize
+                       // ValueNullable properly, so enforcing this here would 
always fail when reading from an IPC
+                       // stream
+                       // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (arrow.TimeUnit, 
arrow.DataType, bool) {
+       timeUnit := arrow.Second
+       offsetType := arrow.PrimitiveTypes.Int16
+       switch t := storageType.(type) {
+       case *arrow.StructType:
+               if t.NumFields() != 2 {
+                       return timeUnit, offsetType, false
+               }
+
+               maybeTimestamp := t.Field(0)
+               maybeOffset := t.Field(1)
+
+               timestampOk := false
+               switch timestampType := maybeTimestamp.Type.(type) {
+               case *arrow.TimestampType:
+                       if timestampType.TimeZone == "UTC" {
+                               timestampOk = true
+                               timeUnit = timestampType.TimeUnit()
+                       }
+               default:
+               }
+
+               offsetOk := isOffsetTypeOk(maybeOffset.Type)
+
+               ok := maybeTimestamp.Name == "timestamp" &&
+                       timestampOk &&
+                       !maybeTimestamp.Nullable &&
+                       maybeOffset.Name == "offset_minutes" &&
+                       offsetOk &&
+                       !maybeOffset.Nullable
+
+               return timeUnit, maybeOffset.Type, ok
+       default:
+               return timeUnit, offsetType, false
+       }
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+
+// NewTimestampWithOffsetTypePrimitiveEncoded creates a new 
TimestampWithOffsetType with the underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetTypePrimitiveEncoded(unit arrow.TimeUnit) 
*TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetType(unit, arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded(unit arrow.TimeUnit, index 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.DictionaryType{
+               IndexType: index,
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       return NewTimestampWithOffsetType(unit, &offsetType)
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded(unit arrow.TimeUnit, runEnds 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       offsetType := arrow.RunEndEncodedOf(runEnds, arrow.PrimitiveTypes.Int16)
+       if !offsetType.ValidRunEndsType(runEnds) {
+               return nil, errors.New(fmt.Sprintf("Invalid run-ends type %s", 
runEnds))
+       }
+
+       return NewTimestampWithOffsetType(unit, offsetType)
+}
+
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+       return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return 
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+       return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+       return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, 
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data 
string) (arrow.ExtensionType, error) {
+       timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+       if !ok {
+               return nil, fmt.Errorf("invalid storage type for 
TimestampWithOffsetType: %s", storageType.Name())
+       }
+
+       v, _ := NewTimestampWithOffsetType(timeUnit, offsetType)
+       // SAFETY: the offsetType has already been checked by 
isDataTypeCompatible, so we can ignore the error
+
+       return v, nil
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) 
bool {
+       return b.ExtensionName() == other.ExtensionName()
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+       return 
b.ExtensionBase.Storage.(*arrow.StructType).Fields()[0].Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) 
array.Builder {
+       v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This will never error as Int16 is always a valid type for 
the offset field
+
+       return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+       array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+       var o strings.Builder
+       o.WriteString("[")
+       for i := 0; i < a.Len(); i++ {
+               if i > 0 {
+                       o.WriteString(" ")
+               }
+               switch {
+               case a.IsNull(i):
+                       o.WriteString(array.NullValueStr)
+               default:
+                       fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+               }
+       }
+       o.WriteString("]")
+       return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, 
unit arrow.TimeUnit) time.Time {
+       hours := offsetMinutes / 60
+       minutes := offsetMinutes % 60
+       if minutes < 0 {
+               minutes = -minutes
+       }
+
+       loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), 
int(offsetMinutes)*60)
+       return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, 
int16) {
+       // naive "bitwise" conversion to UTC, keeping the underlying date the 
same
+       utc := t.UTC()
+       naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+       offsetMinutes := int16(naiveUtc.Sub(t).Minutes())

Review Comment:
   can you give me an example where it messes up the values? 
   
   Just to verify, the goal here is to take `t` and provide the same *instant 
of time* in UTC while figuring out what the offset was in minutes for the 
timezone, yes? i.e. given `2009-11-10 15:00:00 -0800 PST` this should return 
`2009-11-10 23:00:00 +0000 UTC` and `480`, correct?



##########
arrow/extensions/timestamp_with_offset.go:
##########
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package extensions
+
+import (
+       "errors"
+       "fmt"
+       "iter"
+       "math"
+       "reflect"
+       "slices"
+       "strings"
+       "time"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
+)
+
+// TimestampWithOffsetType represents a timestamp column that stores a 
timezone offset per row instead of
+// applying the same timezone offset to the entire column.
+type TimestampWithOffsetType struct {
+       arrow.ExtensionBase
+}
+
+func isOffsetTypeOk(offsetType arrow.DataType) bool {
+       switch offsetType := offsetType.(type) {
+       case *arrow.Int16Type:
+               return true
+       case *arrow.DictionaryType:
+               return arrow.IsInteger(offsetType.IndexType.ID()) && 
arrow.TypeEqual(offsetType.ValueType, arrow.PrimitiveTypes.Int16)
+       case *arrow.RunEndEncodedType:
+               return offsetType.ValidRunEndsType(offsetType.RunEnds()) &&
+                       arrow.TypeEqual(offsetType.Encoded(), 
arrow.PrimitiveTypes.Int16)
+               // FIXME: Technically this should be non-nullable, but a Arrow 
IPC does not deserialize
+               // ValueNullable properly, so enforcing this here would always 
fail when reading from an IPC
+               // stream
+               // !offsetType.ValueNullable
+       default:
+               return false
+       }
+}
+
+// Whether the storageType is compatible with TimestampWithOffset.
+//
+// Returns (time_unit, offset_type, ok). If ok is false, time_unit and 
offset_type are garbage.
+func isDataTypeCompatible(storageType arrow.DataType) (unit arrow.TimeUnit, 
offsetType arrow.DataType, ok bool) {
+       unit = arrow.Second
+       offsetType = arrow.PrimitiveTypes.Int16
+       ok = false
+
+       st, compat := storageType.(*arrow.StructType)
+       if !compat || st.NumFields() != 2 {
+               return
+       }
+
+       if ts, compat := st.Field(0).Type.(*arrow.TimestampType); compat && 
ts.TimeZone == "UTC" {
+               unit = ts.TimeUnit()
+       } else {
+               return
+       }
+
+       maybeOffset := st.Field(1)
+       offsetType = maybeOffset.Type
+
+       ok = st.Field(0).Name == "timestamp" &&
+               !st.Field(0).Nullable &&
+               maybeOffset.Name == "offset_minutes" &&
+               isOffsetTypeOk(offsetType) &&
+               !maybeOffset.Nullable
+       return
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Int16), where T is any 
TimeUnit.
+func NewTimestampWithOffsetType(unit arrow.TimeUnit) *TimestampWithOffsetType {
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, 
arrow.PrimitiveTypes.Int16)
+       // SAFETY: This should never error as Int16 is always a valid offset 
type
+
+       return v
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=O), where T is any 
TimeUnit and O is a valid offset type.
+//
+// The error will be populated if the data type is not a valid encoding of the 
offsets field.
+func NewTimestampWithOffsetTypeCustomOffset(unit arrow.TimeUnit, offsetType 
arrow.DataType) (*TimestampWithOffsetType, error) {
+       if !isOffsetTypeOk(offsetType) {
+               return nil, errors.New(fmt.Sprintf("Invalid offset type %s", 
offsetType))
+       }
+
+       return &TimestampWithOffsetType{
+               ExtensionBase: arrow.ExtensionBase{
+                       Storage: arrow.StructOf(
+                               arrow.Field{
+                                       Name: "timestamp",
+                                       Type: &arrow.TimestampType{
+                                               Unit:     unit,
+                                               TimeZone: "UTC",
+                                       },
+                                       Nullable: false,
+                               },
+                               arrow.Field{
+                                       Name:     "offset_minutes",
+                                       Type:     offsetType,
+                                       Nullable: false,
+                               },
+                       ),
+               },
+       }, nil
+}
+
+type DictIndexType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=Dictionary(I, Int16)), 
where T is any TimeUnit and I is a
+// valid Dictionary index type.
+//
+// The error will be populated if the index is not a valid dictionary-encoding 
index type.
+func NewTimestampWithOffsetTypeDictionaryEncoded[I DictIndexType](unit 
arrow.TimeUnit, index I) *TimestampWithOffsetType {
+       offsetType := arrow.DictionaryType{
+               IndexType: arrow.DataType(index),
+               ValueType: arrow.PrimitiveTypes.Int16,
+               Ordered:   false,
+       }
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, &offsetType)
+       // SAFETY: This should never error as DictIndexType is always a valid 
index type
+
+       return v
+}
+
+type TimestampWithOffsetRunEndsType interface {
+       *arrow.Int8Type | *arrow.Int16Type | *arrow.Int32Type | 
*arrow.Int64Type |
+               *arrow.Uint8Type | *arrow.Uint16Type | *arrow.Uint32Type | 
*arrow.Uint64Type
+}
+
+// NewTimestampWithOffsetType creates a new TimestampWithOffsetType with the 
underlying storage type set correctly to
+// Struct(timestamp=Timestamp(T, "UTC"), offset_minutes=RunEndEncoded(E, 
Int16)), where T is any TimeUnit and E is a
+// valid run-ends type.
+//
+// The error will be populated if runEnds is not a valid run-end encoding 
run-ends type.
+func NewTimestampWithOffsetTypeRunEndEncoded[E 
TimestampWithOffsetRunEndsType](unit arrow.TimeUnit, runEnds E) 
*TimestampWithOffsetType {
+       offsetType := arrow.RunEndEncodedOf(arrow.DataType(runEnds), 
arrow.PrimitiveTypes.Int16)
+
+       v, _ := NewTimestampWithOffsetTypeCustomOffset(unit, offsetType)
+       // SAFETY: This should never error as TimestampWithOffsetRunEndsType is 
always a valid run ends type
+
+       return v
+
+}
+
+func (b *TimestampWithOffsetType) ArrayType() reflect.Type {
+       return reflect.TypeOf(TimestampWithOffsetArray{})
+}
+
+func (b *TimestampWithOffsetType) ExtensionName() string { return 
"arrow.timestamp_with_offset" }
+
+func (b *TimestampWithOffsetType) String() string {
+       return fmt.Sprintf("extension<%s>", b.ExtensionName())
+}
+
+func (e *TimestampWithOffsetType) MarshalJSON() ([]byte, error) {
+       return []byte(fmt.Sprintf(`{"name":"%s","metadata":%s}`, 
e.ExtensionName(), e.Serialize())), nil
+}
+
+func (b *TimestampWithOffsetType) Serialize() string { return "" }
+
+func (b *TimestampWithOffsetType) Deserialize(storageType arrow.DataType, data 
string) (arrow.ExtensionType, error) {
+       timeUnit, offsetType, ok := isDataTypeCompatible(storageType)
+       if !ok {
+               return nil, fmt.Errorf("invalid storage type for 
TimestampWithOffsetType: %s", storageType.Name())
+       }
+
+       return NewTimestampWithOffsetTypeCustomOffset(timeUnit, offsetType)
+}
+
+func (b *TimestampWithOffsetType) ExtensionEquals(other arrow.ExtensionType) 
bool {
+       return b.ExtensionName() == other.ExtensionName() &&
+               arrow.TypeEqual(b.StorageType(), other.StorageType())
+}
+
+func (b *TimestampWithOffsetType) OffsetType() arrow.DataType {
+       return b.ExtensionBase.Storage.(*arrow.StructType).Field(1).Type
+}
+
+func (b *TimestampWithOffsetType) TimeUnit() arrow.TimeUnit {
+       return 
b.ExtensionBase.Storage.(*arrow.StructType).Field(0).Type.(*arrow.TimestampType).TimeUnit()
+}
+
+func (b *TimestampWithOffsetType) NewBuilder(mem memory.Allocator) 
array.Builder {
+       v, _ := NewTimestampWithOffsetBuilder(mem, b.TimeUnit(), b.OffsetType())
+       return v
+}
+
+// TimestampWithOffsetArray is a simple array of struct
+type TimestampWithOffsetArray struct {
+       array.ExtensionArrayBase
+}
+
+func (a *TimestampWithOffsetArray) String() string {
+       var o strings.Builder
+       o.WriteString("[")
+       for i := 0; i < a.Len(); i++ {
+               if i > 0 {
+                       o.WriteString(" ")
+               }
+               switch {
+               case a.IsNull(i):
+                       o.WriteString(array.NullValueStr)
+               default:
+                       fmt.Fprintf(&o, "\"%s\"", a.Value(i))
+               }
+       }
+       o.WriteString("]")
+       return o.String()
+}
+
+func timeFromFieldValues(utcTimestamp arrow.Timestamp, offsetMinutes int16, 
unit arrow.TimeUnit) time.Time {
+       hours := offsetMinutes / 60
+       minutes := offsetMinutes % 60
+       if minutes < 0 {
+               minutes = -minutes
+       }
+
+       loc := time.FixedZone(fmt.Sprintf("UTC%+03d:%02d", hours, minutes), 
int(offsetMinutes)*60)
+       return utcTimestamp.ToTime(unit).In(loc)
+}
+
+func fieldValuesFromTime(t time.Time, unit arrow.TimeUnit) (arrow.Timestamp, 
int16) {
+       // naive "bitwise" conversion to UTC, keeping the underlying date the 
same
+       utc := t.UTC()
+       naiveUtc := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 
t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
+       offsetMinutes := int16(naiveUtc.Sub(t).Minutes())
+
+       // SAFETY: unit MUST have been validated to a valid arrow.TimeUnit 
value before
+       // this function. Otherwise, ignoring this error is not safe.
+       timestamp, _ := arrow.TimestampFromTime(utc, unit)
+       return timestamp, offsetMinutes
+}
+
+// Get the raw arrow values at the given index
+//
+// SAFETY: the value at i must not be nil
+func (a *TimestampWithOffsetArray) rawValueUnsafe(i int) (arrow.Timestamp, 
int16, arrow.TimeUnit) {
+       structs := a.Storage().(*array.Struct)
+
+       timestampField := structs.Field(0)
+       timestamps := timestampField.(*array.Timestamp)
+
+       timeUnit := timestampField.DataType().(*arrow.TimestampType).Unit
+       utcTimestamp := timestamps.Value(i)
+
+       var offsetMinutes int16
+
+       switch offsets := structs.Field(1).(type) {
+       case *array.Int16:
+               offsetMinutes = offsets.Value(i)
+       case *array.Dictionary:
+               offsetMinutes = 
offsets.Dictionary().(*array.Int16).Value(offsets.GetValueIndex(i))
+       case *array.RunEndEncoded:
+               offsetMinutes = 
offsets.Values().(*array.Int16).Value(offsets.GetPhysicalIndex(i))
+       }
+
+       return utcTimestamp, offsetMinutes, timeUnit
+}
+
+func (a *TimestampWithOffsetArray) Value(i int) time.Time {
+       if a.IsNull(i) {
+               return time.Unix(0, 0)
+       }
+       utcTimestamp, offsetMinutes, timeUnit := a.rawValueUnsafe(i)
+       return timeFromFieldValues(utcTimestamp, offsetMinutes, timeUnit)
+}
+
+// Iterates over the array returning the timestamp at each position.
+//
+// If the timestamp is null, the returned time will be the unix epoch.
+//
+// This will iterate using the fastest method given the underlying storage 
array
+func (a *TimestampWithOffsetArray) iterValues() iter.Seq[time.Time] {
+       return func(yield func(time.Time) bool) {
+               structs := a.Storage().(*array.Struct)
+               offsets := structs.Field(1)
+               if reeOffsets, isRee := offsets.(*array.RunEndEncoded); isRee {
+                       timestampField := structs.Field(0)
+                       timeUnit := 
timestampField.DataType().(*arrow.TimestampType).Unit
+                       timestamps := timestampField.(*array.Timestamp)
+
+                       offsetValues := reeOffsets.Values().(*array.Int16)
+                       offsetPhysicalIdx := 0
+
+                       var getRunEnd (func(int) int)
+                       switch arr := reeOffsets.RunEndsArr().(type) {
+                       case *array.Int16:
+                               getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+                       case *array.Int32:
+                               getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+                       case *array.Int64:
+                               getRunEnd = func(idx int) int { return 
int(arr.Value(idx)) }
+                       }
+
+                       for i := 0; i < a.Len(); i++ {
+                               if i >= getRunEnd(offsetPhysicalIdx) {
+                                       offsetPhysicalIdx += 1
+                               }
+
+                               ts := time.Unix(0, 0)
+                               if a.IsValid(i) {
+                                       utcTimestamp := timestamps.Value(i)
+                                       offsetMinutes := 
offsetValues.Value(offsetPhysicalIdx)
+                                       v := timeFromFieldValues(utcTimestamp, 
offsetMinutes, timeUnit)
+                                       ts = v
+                               }
+
+                               if !yield(ts) {
+                                       return
+                               }
+                       }
+               } else {
+                       for i := 0; i < a.Len(); i++ {
+                               ts := time.Unix(0, 0)

Review Comment:
   this should be equivalent to just doing `var ts time.Time`, except faster 
since it avoids the extra processing of calling the `Unix` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to