This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 858574d0bd GH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp 
Instant/Local Semantics (#39467)
858574d0bd is described below

commit 858574d0bd1f3ef4157d0446cfb05cef05aac96b
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Thu Jan 18 11:09:50 2024 -0500

    GH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp Instant/Local 
Semantics (#39467)
    
    
    
    ### Rationale for this change
    
    Closes: #39466
    
    ### What changes are included in this PR?
    
    - Update logic for determining whether an Arrow Timestamp should have 
`isAdjustedToUTC=true` on conversion to Parquet.
    - Update conversion from Parquet Timestamp to Arrow Timestamp to align with 
Parquet Format 
[backward-compatibilty](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485)
 rules.
    - Refactor Timestamp serialization methods to reduce duplicated code
    
    ### Are these changes tested?
    
    Yes,
    - Logical type mapping in existing test updated.
    - New tests for roundtrip behavior of timestamps with various timezone 
settings, with/without store_schema enabled.
    - New test to clarify equality behavior of timestamps with instant 
semantics, as well as Go-related quirks with timezone-unaware timestamps.
    
    ### Are there any user-facing changes?
    
    Yes, users of `pqarrow.FileWriter` will produce Parquet files in which the 
`TIMESTAMP` type is normalized to UTC IFF the Arrow type provided has a 
timezone specified. This is different from the current Go behavior but aligned 
that of other implementations.
    
    The conversion from Parquet to Arrow has been updated as well to reflect 
the Parquet format 
[document](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485).
 Rust already 
[implements](https://github.com/apache/arrow-rs/blob/a61e824abdd7b38ea214828480430ff2a13f2ead/parquet/src/arrow/schema/primitive.rs#L211-L239)
 the spec as described and #39489 has been reported due to a mismatch in the 
handling of convertedTypes in C++.
    
    * Closes: #39466
    
    Authored-by: Joel Lubinitsky <[email protected]>
    Signed-off-by: Matt Topol <[email protected]>
---
 go/arrow/array/timestamp.go             | 11 +++---
 go/arrow/array/timestamp_test.go        | 49 ++++++++++++++++++++++-
 go/arrow/datatype_fixedwidth.go         | 19 +++------
 go/parquet/pqarrow/encode_arrow_test.go | 70 +++++++++++++++++++++++++++++++++
 go/parquet/pqarrow/schema.go            | 13 +++---
 go/parquet/pqarrow/schema_test.go       |  6 +--
 6 files changed, 140 insertions(+), 28 deletions(-)

diff --git a/go/arrow/array/timestamp.go b/go/arrow/array/timestamp.go
index 6ffb43e067..0cc46a127f 100644
--- a/go/arrow/array/timestamp.go
+++ b/go/arrow/array/timestamp.go
@@ -91,16 +91,15 @@ func (a *Timestamp) ValueStr(i int) string {
                return NullValueStr
        }
 
-       dt := a.DataType().(*arrow.TimestampType)
-       z, _ := dt.GetZone()
-       return a.values[i].ToTime(dt.Unit).In(z).Format("2006-01-02 
15:04:05.999999999Z0700")
+       toTime, _ := a.DataType().(*arrow.TimestampType).GetToTimeFunc()
+       return toTime(a.values[i]).Format("2006-01-02 15:04:05.999999999Z0700")
 }
 
 func (a *Timestamp) GetOneForMarshal(i int) interface{} {
-       if a.IsNull(i) {
-               return nil
+       if val := a.ValueStr(i); val != NullValueStr {
+               return val
        }
-       return 
a.values[i].ToTime(a.DataType().(*arrow.TimestampType).Unit).Format("2006-01-02 
15:04:05.999999999")
+       return nil
 }
 
 func (a *Timestamp) MarshalJSON() ([]byte, error) {
diff --git a/go/arrow/array/timestamp_test.go b/go/arrow/array/timestamp_test.go
index acbad8b586..c172ad811d 100644
--- a/go/arrow/array/timestamp_test.go
+++ b/go/arrow/array/timestamp_test.go
@@ -234,7 +234,7 @@ func TestTimestampBuilder_Resize(t *testing.T) {
        assert.Equal(t, 5, ab.Len())
 }
 
-func TestTimestampValueStr(t *testing.T) {             
+func TestTimestampValueStr(t *testing.T) {
        mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
        defer mem.AssertSize(t, 0)
 
@@ -251,3 +251,50 @@ func TestTimestampValueStr(t *testing.T) {
        assert.Equal(t, "1968-11-30 13:30:45-0700", arr.ValueStr(0))
        assert.Equal(t, "2016-02-29 10:42:23-0700", arr.ValueStr(1))
 }
+
+func TestTimestampEquality(t *testing.T) {
+       mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+       defer mem.AssertSize(t, 0)
+
+       tsDatatypes := []*arrow.TimestampType{
+               {Unit: arrow.Second},
+               {Unit: arrow.Second, TimeZone: "UTC"},
+               {Unit: arrow.Second, TimeZone: "America/Phoenix"},
+       }
+
+       arrs := make([]*array.Timestamp, 0, len(tsDatatypes))
+       for _, dt := range tsDatatypes {
+               bldr := array.NewTimestampBuilder(mem, dt)
+               defer bldr.Release()
+
+               bldr.Append(-34226955)
+               bldr.Append(1456767743)
+
+               arr := bldr.NewTimestampArray()
+               defer arr.Release()
+
+               arrs = append(arrs, arr)
+       }
+
+       // No timezone, "wall clock" semantics
+       // These timestamps have no actual timezone, but we still represent as 
UTC per Go conventions
+       assert.Equal(t, "1968-11-30 20:30:45Z", arrs[0].ValueStr(0))
+       assert.Equal(t, "2016-02-29 17:42:23Z", arrs[0].ValueStr(1))
+
+       // UTC timezone, "instant" semantics
+       assert.Equal(t, "1968-11-30 20:30:45Z", arrs[1].ValueStr(0))
+       assert.Equal(t, "2016-02-29 17:42:23Z", arrs[1].ValueStr(1))
+
+       // America/Phoenix timezone, "instant" semantics
+       assert.Equal(t, "1968-11-30 13:30:45-0700", arrs[2].ValueStr(0))
+       assert.Equal(t, "2016-02-29 10:42:23-0700", arrs[2].ValueStr(1))
+
+       // Despite timezone and semantics, the physical values are equivalent
+       assert.Equal(t, arrs[0].Value(0), arrs[1].Value(0))
+       assert.Equal(t, arrs[0].Value(0), arrs[2].Value(0))
+       assert.Equal(t, arrs[1].Value(0), arrs[2].Value(0))
+
+       assert.Equal(t, arrs[0].Value(1), arrs[1].Value(1))
+       assert.Equal(t, arrs[0].Value(1), arrs[2].Value(1))
+       assert.Equal(t, arrs[1].Value(1), arrs[2].Value(1))
+}
diff --git a/go/arrow/datatype_fixedwidth.go b/go/arrow/datatype_fixedwidth.go
index 1a3074e59e..158dbd67b1 100644
--- a/go/arrow/datatype_fixedwidth.go
+++ b/go/arrow/datatype_fixedwidth.go
@@ -348,8 +348,11 @@ type TemporalWithUnit interface {
 }
 
 // TimestampType is encoded as a 64-bit signed integer since the UNIX epoch 
(2017-01-01T00:00:00Z).
-// The zero-value is a second and time zone neutral. Time zone neutral can be
-// considered UTC without having "UTC" as a time zone.
+// The zero-value is a second and time zone neutral. In Arrow semantics, time 
zone neutral does not
+// represent a physical point in time, but rather a "wall clock" time that 
only has meaning within
+// the context that produced it. In Go, time.Time can only represent instants; 
there is no notion
+// of "wall clock" time. Therefore, time zone neutral timestamps are 
represented as UTC per Go
+// conventions even though the Arrow type itself has no time zone.
 type TimestampType struct {
        Unit     TimeUnit
        TimeZone string
@@ -454,17 +457,7 @@ func (t *TimestampType) GetToTimeFunc() (func(Timestamp) 
time.Time, error) {
                return nil, err
        }
 
-       switch t.Unit {
-       case Second:
-               return func(v Timestamp) time.Time { return time.Unix(int64(v), 
0).In(tz) }, nil
-       case Millisecond:
-               return func(v Timestamp) time.Time { return 
time.UnixMilli(int64(v)).In(tz) }, nil
-       case Microsecond:
-               return func(v Timestamp) time.Time { return 
time.UnixMicro(int64(v)).In(tz) }, nil
-       case Nanosecond:
-               return func(v Timestamp) time.Time { return time.Unix(0, 
int64(v)).In(tz) }, nil
-       }
-       return nil, fmt.Errorf("invalid timestamp unit: %s", t.Unit)
+       return func(v Timestamp) time.Time { return v.ToTime(t.Unit).In(tz) }, 
nil
 }
 
 // Time32Type is encoded as a 32-bit signed integer, representing either 
seconds or milliseconds since midnight.
diff --git a/go/parquet/pqarrow/encode_arrow_test.go 
b/go/parquet/pqarrow/encode_arrow_test.go
index 75eb965d03..25d31b54e1 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -171,6 +171,41 @@ func makeDateTypeTable(mem memory.Allocator, expected 
bool, partialDays bool) ar
        return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
 }
 
+func makeTimestampTypeTable(mem memory.Allocator, expected bool) arrow.Table {
+       isValid := []bool{true, true, true, false, true, true}
+
+       // Timestamp with relative (i.e. local) semantics. Make sure it 
roundtrips without being incorrectly converted to an absolute point in time.
+       f0 := arrow.Field{Name: "f0", Type: &arrow.TimestampType{Unit: 
arrow.Millisecond}, Nullable: true, Metadata: 
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})}
+
+       // Timestamp with absolute (i.e. instant) semantics. The physical 
representation is always from Unix epoch in UTC timezone.
+       // TimeZone is used for display purposes and can be stripped on 
roundtrip without changing the actual instant referred to.
+       // WithStoreSchema will preserve the original timezone, but the instant 
in will be equivalent even if it's not used.
+       f1 := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: 
arrow.Millisecond, TimeZone: "EST"}, Nullable: true, Metadata: 
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}
+       f1X := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit: 
arrow.Millisecond, TimeZone: "UTC"}, Nullable: true, Metadata: 
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}
+
+       fieldList := []arrow.Field{f0}
+       if expected {
+               fieldList = append(fieldList, f1X)
+       } else {
+               fieldList = append(fieldList, f1)
+       }
+
+       arrsc := arrow.NewSchema(fieldList, nil)
+
+       ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272, 
1489272, 1489273}
+
+       bldr := array.NewRecordBuilder(mem, arrsc)
+       defer bldr.Release()
+
+       bldr.Field(0).(*array.TimestampBuilder).AppendValues(ts64msValues, 
isValid)
+       bldr.Field(1).(*array.TimestampBuilder).AppendValues(ts64msValues, 
isValid)
+
+       rec := bldr.NewRecord()
+       defer rec.Release()
+
+       return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
+}
+
 func TestWriteArrowCols(t *testing.T) {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(t, 0)
@@ -954,6 +989,25 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() {
        ps.Truef(array.TableEqual(date32ExpectedOutputTable, 
roundTripOutputTable), "expected table: %s\ngot table: %s", 
date32ExpectedOutputTable, roundTripOutputTable)
 }
 
+func (ps *ParquetIOTestSuite) TestTimestampTZReadWriteTable() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       inputTable := makeTimestampTypeTable(mem, false)
+       defer inputTable.Release()
+       buf := writeTableToBuffer(ps.T(), mem, inputTable, 
inputTable.NumRows(), 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
+       defer buf.Release()
+
+       reader := ps.createReader(mem, buf.Bytes())
+       roundTripOutputTable := ps.readTable(reader)
+       defer roundTripOutputTable.Release()
+
+       expectedOutputTable := makeTimestampTypeTable(mem, true)
+       defer expectedOutputTable.Release()
+
+       ps.Truef(array.TableEqual(expectedOutputTable, roundTripOutputTable), 
"expected table: %s\ngot table: %s", expectedOutputTable, roundTripOutputTable)
+}
+
 func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(ps.T(), 0)
@@ -973,6 +1027,22 @@ func (ps *ParquetIOTestSuite) 
TestDate64ReadWriteTableWithPartialDays() {
        ps.Truef(array.TableEqual(date32ExpectedOutputTable, 
roundTripOutputTable), "expected table: %s\ngot table: %s", 
date32ExpectedOutputTable, roundTripOutputTable)
 }
 
+func (ps *ParquetIOTestSuite) TestTimestampTZStoreSchemaReadWriteTable() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(ps.T(), 0)
+
+       inputTable := makeTimestampTypeTable(mem, false)
+       defer inputTable.Release()
+       buf := writeTableToBuffer(ps.T(), mem, inputTable, 
inputTable.NumRows(), 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), 
pqarrow.WithStoreSchema()))
+       defer buf.Release()
+
+       reader := ps.createReader(mem, buf.Bytes())
+       roundTripOutputTable := ps.readTable(reader)
+       defer roundTripOutputTable.Release()
+
+       ps.Truef(array.TableEqual(inputTable, roundTripOutputTable), "expected 
table: %s\ngot table: %s", inputTable, roundTripOutputTable)
+}
+
 func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
        mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
        defer mem.AssertSize(ps.T(), 0)
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index 383d47fbaa..f2aa4cdfe0 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -125,7 +125,7 @@ func isDictionaryReadSupported(dt arrow.DataType) bool {
 }
 
 func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit) 
schema.LogicalType {
-       utc := typ.TimeZone == "" || typ.TimeZone == "UTC"
+       isAdjustedToUTC := typ.TimeZone != ""
 
        // for forward compatibility reasons, and because there's no other way
        // to signal to old readers that values are timestamps, we force
@@ -146,7 +146,7 @@ func arrowTimestampToLogical(typ *arrow.TimestampType, unit 
arrow.TimeUnit) sche
                return schema.NoLogicalType{}
        }
 
-       return schema.NewTimestampLogicalTypeForce(utc, scunit)
+       return schema.NewTimestampLogicalTypeForce(isAdjustedToUTC, scunit)
 }
 
 func getTimestampMeta(typ *arrow.TimestampType, props 
*parquet.WriterProperties, arrprops ArrowWriterProperties) (parquet.Type, 
schema.LogicalType, error) {
@@ -519,9 +519,12 @@ func arrowTime64(logical *schema.TimeLogicalType) 
(arrow.DataType, error) {
 }
 
 func arrowTimestamp(logical *schema.TimestampLogicalType) (arrow.DataType, 
error) {
-       tz := "UTC"
-       if logical.IsFromConvertedType() {
-               tz = ""
+       tz := ""
+
+       // ConvertedTypes are adjusted to UTC per backward compatibility 
guidelines
+       // 
https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485
+       if logical.IsAdjustedToUTC() || logical.IsFromConvertedType() {
+               tz = "UTC"
        }
 
        switch logical.TimeUnit() {
diff --git a/go/parquet/pqarrow/schema_test.go 
b/go/parquet/pqarrow/schema_test.go
index a3c2c7a4ff..f320b90303 100644
--- a/go/parquet/pqarrow/schema_test.go
+++ b/go/parquet/pqarrow/schema_test.go
@@ -304,7 +304,7 @@ func TestCoerceTImestampV1(t *testing.T) {
        arrowFields := make([]arrow.Field, 0)
 
        parquetFields = append(parquetFields, 
schema.Must(schema.NewPrimitiveNodeLogical("timestamp", 
parquet.Repetitions.Required,
-               schema.NewTimestampLogicalTypeForce(false, 
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
+               schema.NewTimestampLogicalTypeForce(true, 
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
        arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: 
&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}})
 
        arrowSchema := arrow.NewSchema(arrowFields, nil)
@@ -323,11 +323,11 @@ func TestAutoCoerceTImestampV1(t *testing.T) {
        arrowFields := make([]arrow.Field, 0)
 
        parquetFields = append(parquetFields, 
schema.Must(schema.NewPrimitiveNodeLogical("timestamp", 
parquet.Repetitions.Required,
-               schema.NewTimestampLogicalTypeForce(false, 
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
+               schema.NewTimestampLogicalTypeForce(true, 
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
        arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type: 
&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "EST"}})
 
        parquetFields = append(parquetFields, 
schema.Must(schema.NewPrimitiveNodeLogical("timestamp[ms]", 
parquet.Repetitions.Required,
-               schema.NewTimestampLogicalTypeForce(true, 
schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
+               schema.NewTimestampLogicalTypeForce(false, 
schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
        arrowFields = append(arrowFields, arrow.Field{Name: "timestamp[ms]", 
Type: &arrow.TimestampType{Unit: arrow.Second}})
 
        arrowSchema := arrow.NewSchema(arrowFields, nil)

Reply via email to