This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 858574d0bd GH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp
Instant/Local Semantics (#39467)
858574d0bd is described below
commit 858574d0bd1f3ef4157d0446cfb05cef05aac96b
Author: Joel Lubinitsky <[email protected]>
AuthorDate: Thu Jan 18 11:09:50 2024 -0500
GH-39466: [Go][Parquet] Align Arrow and Parquet Timestamp Instant/Local
Semantics (#39467)
### Rationale for this change
Closes: #39466
### What changes are included in this PR?
- Update logic for determining whether an Arrow Timestamp should have
`isAdjustedToUTC=true` on conversion to Parquet.
- Update conversion from Parquet Timestamp to Arrow Timestamp to align with
Parquet Format
[backward-compatibilty](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485)
rules.
- Refactor Timestamp serialization methods to reduce duplicated code
### Are these changes tested?
Yes,
- Logical type mapping in existing test updated.
- New tests for roundtrip behavior of timestamps with various timezone
settings, with/without store_schema enabled.
- New test to clarify equality behavior of timestamps with instant
semantics, as well as Go-related quirks with timezone-unaware timestamps.
### Are there any user-facing changes?
Yes, users of `pqarrow.FileWriter` will produce Parquet files in which the
`TIMESTAMP` type is normalized to UTC IFF the Arrow type provided has a
timezone specified. This is different from the current Go behavior but aligned
that of other implementations.
The conversion from Parquet to Arrow has been updated as well to reflect
the Parquet format
[document](https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485).
Rust already
[implements](https://github.com/apache/arrow-rs/blob/a61e824abdd7b38ea214828480430ff2a13f2ead/parquet/src/arrow/schema/primitive.rs#L211-L239)
the spec as described and #39489 has been reported due to a mismatch in the
handling of convertedTypes in C++.
* Closes: #39466
Authored-by: Joel Lubinitsky <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
---
go/arrow/array/timestamp.go | 11 +++---
go/arrow/array/timestamp_test.go | 49 ++++++++++++++++++++++-
go/arrow/datatype_fixedwidth.go | 19 +++------
go/parquet/pqarrow/encode_arrow_test.go | 70 +++++++++++++++++++++++++++++++++
go/parquet/pqarrow/schema.go | 13 +++---
go/parquet/pqarrow/schema_test.go | 6 +--
6 files changed, 140 insertions(+), 28 deletions(-)
diff --git a/go/arrow/array/timestamp.go b/go/arrow/array/timestamp.go
index 6ffb43e067..0cc46a127f 100644
--- a/go/arrow/array/timestamp.go
+++ b/go/arrow/array/timestamp.go
@@ -91,16 +91,15 @@ func (a *Timestamp) ValueStr(i int) string {
return NullValueStr
}
- dt := a.DataType().(*arrow.TimestampType)
- z, _ := dt.GetZone()
- return a.values[i].ToTime(dt.Unit).In(z).Format("2006-01-02
15:04:05.999999999Z0700")
+ toTime, _ := a.DataType().(*arrow.TimestampType).GetToTimeFunc()
+ return toTime(a.values[i]).Format("2006-01-02 15:04:05.999999999Z0700")
}
func (a *Timestamp) GetOneForMarshal(i int) interface{} {
- if a.IsNull(i) {
- return nil
+ if val := a.ValueStr(i); val != NullValueStr {
+ return val
}
- return
a.values[i].ToTime(a.DataType().(*arrow.TimestampType).Unit).Format("2006-01-02
15:04:05.999999999")
+ return nil
}
func (a *Timestamp) MarshalJSON() ([]byte, error) {
diff --git a/go/arrow/array/timestamp_test.go b/go/arrow/array/timestamp_test.go
index acbad8b586..c172ad811d 100644
--- a/go/arrow/array/timestamp_test.go
+++ b/go/arrow/array/timestamp_test.go
@@ -234,7 +234,7 @@ func TestTimestampBuilder_Resize(t *testing.T) {
assert.Equal(t, 5, ab.Len())
}
-func TestTimestampValueStr(t *testing.T) {
+func TestTimestampValueStr(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
@@ -251,3 +251,50 @@ func TestTimestampValueStr(t *testing.T) {
assert.Equal(t, "1968-11-30 13:30:45-0700", arr.ValueStr(0))
assert.Equal(t, "2016-02-29 10:42:23-0700", arr.ValueStr(1))
}
+
+func TestTimestampEquality(t *testing.T) {
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ tsDatatypes := []*arrow.TimestampType{
+ {Unit: arrow.Second},
+ {Unit: arrow.Second, TimeZone: "UTC"},
+ {Unit: arrow.Second, TimeZone: "America/Phoenix"},
+ }
+
+ arrs := make([]*array.Timestamp, 0, len(tsDatatypes))
+ for _, dt := range tsDatatypes {
+ bldr := array.NewTimestampBuilder(mem, dt)
+ defer bldr.Release()
+
+ bldr.Append(-34226955)
+ bldr.Append(1456767743)
+
+ arr := bldr.NewTimestampArray()
+ defer arr.Release()
+
+ arrs = append(arrs, arr)
+ }
+
+ // No timezone, "wall clock" semantics
+ // These timestamps have no actual timezone, but we still represent as
UTC per Go conventions
+ assert.Equal(t, "1968-11-30 20:30:45Z", arrs[0].ValueStr(0))
+ assert.Equal(t, "2016-02-29 17:42:23Z", arrs[0].ValueStr(1))
+
+ // UTC timezone, "instant" semantics
+ assert.Equal(t, "1968-11-30 20:30:45Z", arrs[1].ValueStr(0))
+ assert.Equal(t, "2016-02-29 17:42:23Z", arrs[1].ValueStr(1))
+
+ // America/Phoenix timezone, "instant" semantics
+ assert.Equal(t, "1968-11-30 13:30:45-0700", arrs[2].ValueStr(0))
+ assert.Equal(t, "2016-02-29 10:42:23-0700", arrs[2].ValueStr(1))
+
+ // Despite timezone and semantics, the physical values are equivalent
+ assert.Equal(t, arrs[0].Value(0), arrs[1].Value(0))
+ assert.Equal(t, arrs[0].Value(0), arrs[2].Value(0))
+ assert.Equal(t, arrs[1].Value(0), arrs[2].Value(0))
+
+ assert.Equal(t, arrs[0].Value(1), arrs[1].Value(1))
+ assert.Equal(t, arrs[0].Value(1), arrs[2].Value(1))
+ assert.Equal(t, arrs[1].Value(1), arrs[2].Value(1))
+}
diff --git a/go/arrow/datatype_fixedwidth.go b/go/arrow/datatype_fixedwidth.go
index 1a3074e59e..158dbd67b1 100644
--- a/go/arrow/datatype_fixedwidth.go
+++ b/go/arrow/datatype_fixedwidth.go
@@ -348,8 +348,11 @@ type TemporalWithUnit interface {
}
// TimestampType is encoded as a 64-bit signed integer since the UNIX epoch
(2017-01-01T00:00:00Z).
-// The zero-value is a second and time zone neutral. Time zone neutral can be
-// considered UTC without having "UTC" as a time zone.
+// The zero-value is a second and time zone neutral. In Arrow semantics, time
zone neutral does not
+// represent a physical point in time, but rather a "wall clock" time that
only has meaning within
+// the context that produced it. In Go, time.Time can only represent instants;
there is no notion
+// of "wall clock" time. Therefore, time zone neutral timestamps are
represented as UTC per Go
+// conventions even though the Arrow type itself has no time zone.
type TimestampType struct {
Unit TimeUnit
TimeZone string
@@ -454,17 +457,7 @@ func (t *TimestampType) GetToTimeFunc() (func(Timestamp)
time.Time, error) {
return nil, err
}
- switch t.Unit {
- case Second:
- return func(v Timestamp) time.Time { return time.Unix(int64(v),
0).In(tz) }, nil
- case Millisecond:
- return func(v Timestamp) time.Time { return
time.UnixMilli(int64(v)).In(tz) }, nil
- case Microsecond:
- return func(v Timestamp) time.Time { return
time.UnixMicro(int64(v)).In(tz) }, nil
- case Nanosecond:
- return func(v Timestamp) time.Time { return time.Unix(0,
int64(v)).In(tz) }, nil
- }
- return nil, fmt.Errorf("invalid timestamp unit: %s", t.Unit)
+ return func(v Timestamp) time.Time { return v.ToTime(t.Unit).In(tz) },
nil
}
// Time32Type is encoded as a 32-bit signed integer, representing either
seconds or milliseconds since midnight.
diff --git a/go/parquet/pqarrow/encode_arrow_test.go
b/go/parquet/pqarrow/encode_arrow_test.go
index 75eb965d03..25d31b54e1 100644
--- a/go/parquet/pqarrow/encode_arrow_test.go
+++ b/go/parquet/pqarrow/encode_arrow_test.go
@@ -171,6 +171,41 @@ func makeDateTypeTable(mem memory.Allocator, expected
bool, partialDays bool) ar
return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
}
+func makeTimestampTypeTable(mem memory.Allocator, expected bool) arrow.Table {
+ isValid := []bool{true, true, true, false, true, true}
+
+ // Timestamp with relative (i.e. local) semantics. Make sure it
roundtrips without being incorrectly converted to an absolute point in time.
+ f0 := arrow.Field{Name: "f0", Type: &arrow.TimestampType{Unit:
arrow.Millisecond}, Nullable: true, Metadata:
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"1"})}
+
+ // Timestamp with absolute (i.e. instant) semantics. The physical
representation is always from Unix epoch in UTC timezone.
+ // TimeZone is used for display purposes and can be stripped on
roundtrip without changing the actual instant referred to.
+ // WithStoreSchema will preserve the original timezone, but the instant
in will be equivalent even if it's not used.
+ f1 := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit:
arrow.Millisecond, TimeZone: "EST"}, Nullable: true, Metadata:
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}
+ f1X := arrow.Field{Name: "f1", Type: &arrow.TimestampType{Unit:
arrow.Millisecond, TimeZone: "UTC"}, Nullable: true, Metadata:
arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{"2"})}
+
+ fieldList := []arrow.Field{f0}
+ if expected {
+ fieldList = append(fieldList, f1X)
+ } else {
+ fieldList = append(fieldList, f1)
+ }
+
+ arrsc := arrow.NewSchema(fieldList, nil)
+
+ ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272,
1489272, 1489273}
+
+ bldr := array.NewRecordBuilder(mem, arrsc)
+ defer bldr.Release()
+
+ bldr.Field(0).(*array.TimestampBuilder).AppendValues(ts64msValues,
isValid)
+ bldr.Field(1).(*array.TimestampBuilder).AppendValues(ts64msValues,
isValid)
+
+ rec := bldr.NewRecord()
+ defer rec.Release()
+
+ return array.NewTableFromRecords(arrsc, []arrow.Record{rec})
+}
+
func TestWriteArrowCols(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
@@ -954,6 +989,25 @@ func (ps *ParquetIOTestSuite) TestDate64ReadWriteTable() {
ps.Truef(array.TableEqual(date32ExpectedOutputTable,
roundTripOutputTable), "expected table: %s\ngot table: %s",
date32ExpectedOutputTable, roundTripOutputTable)
}
+func (ps *ParquetIOTestSuite) TestTimestampTZReadWriteTable() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ inputTable := makeTimestampTypeTable(mem, false)
+ defer inputTable.Release()
+ buf := writeTableToBuffer(ps.T(), mem, inputTable,
inputTable.NumRows(),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem)))
+ defer buf.Release()
+
+ reader := ps.createReader(mem, buf.Bytes())
+ roundTripOutputTable := ps.readTable(reader)
+ defer roundTripOutputTable.Release()
+
+ expectedOutputTable := makeTimestampTypeTable(mem, true)
+ defer expectedOutputTable.Release()
+
+ ps.Truef(array.TableEqual(expectedOutputTable, roundTripOutputTable),
"expected table: %s\ngot table: %s", expectedOutputTable, roundTripOutputTable)
+}
+
func (ps *ParquetIOTestSuite) TestDate64ReadWriteTableWithPartialDays() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
@@ -973,6 +1027,22 @@ func (ps *ParquetIOTestSuite)
TestDate64ReadWriteTableWithPartialDays() {
ps.Truef(array.TableEqual(date32ExpectedOutputTable,
roundTripOutputTable), "expected table: %s\ngot table: %s",
date32ExpectedOutputTable, roundTripOutputTable)
}
+func (ps *ParquetIOTestSuite) TestTimestampTZStoreSchemaReadWriteTable() {
+ mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+ defer mem.AssertSize(ps.T(), 0)
+
+ inputTable := makeTimestampTypeTable(mem, false)
+ defer inputTable.Release()
+ buf := writeTableToBuffer(ps.T(), mem, inputTable,
inputTable.NumRows(),
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem),
pqarrow.WithStoreSchema()))
+ defer buf.Release()
+
+ reader := ps.createReader(mem, buf.Bytes())
+ roundTripOutputTable := ps.readTable(reader)
+ defer roundTripOutputTable.Release()
+
+ ps.Truef(array.TableEqual(inputTable, roundTripOutputTable), "expected
table: %s\ngot table: %s", inputTable, roundTripOutputTable)
+}
+
func (ps *ParquetIOTestSuite) TestLargeBinaryReadWriteTable() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go
index 383d47fbaa..f2aa4cdfe0 100644
--- a/go/parquet/pqarrow/schema.go
+++ b/go/parquet/pqarrow/schema.go
@@ -125,7 +125,7 @@ func isDictionaryReadSupported(dt arrow.DataType) bool {
}
func arrowTimestampToLogical(typ *arrow.TimestampType, unit arrow.TimeUnit)
schema.LogicalType {
- utc := typ.TimeZone == "" || typ.TimeZone == "UTC"
+ isAdjustedToUTC := typ.TimeZone != ""
// for forward compatibility reasons, and because there's no other way
// to signal to old readers that values are timestamps, we force
@@ -146,7 +146,7 @@ func arrowTimestampToLogical(typ *arrow.TimestampType, unit
arrow.TimeUnit) sche
return schema.NoLogicalType{}
}
- return schema.NewTimestampLogicalTypeForce(utc, scunit)
+ return schema.NewTimestampLogicalTypeForce(isAdjustedToUTC, scunit)
}
func getTimestampMeta(typ *arrow.TimestampType, props
*parquet.WriterProperties, arrprops ArrowWriterProperties) (parquet.Type,
schema.LogicalType, error) {
@@ -519,9 +519,12 @@ func arrowTime64(logical *schema.TimeLogicalType)
(arrow.DataType, error) {
}
func arrowTimestamp(logical *schema.TimestampLogicalType) (arrow.DataType,
error) {
- tz := "UTC"
- if logical.IsFromConvertedType() {
- tz = ""
+ tz := ""
+
+ // ConvertedTypes are adjusted to UTC per backward compatibility
guidelines
+ //
https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/LogicalTypes.md?plain=1#L480-L485
+ if logical.IsAdjustedToUTC() || logical.IsFromConvertedType() {
+ tz = "UTC"
}
switch logical.TimeUnit() {
diff --git a/go/parquet/pqarrow/schema_test.go
b/go/parquet/pqarrow/schema_test.go
index a3c2c7a4ff..f320b90303 100644
--- a/go/parquet/pqarrow/schema_test.go
+++ b/go/parquet/pqarrow/schema_test.go
@@ -304,7 +304,7 @@ func TestCoerceTImestampV1(t *testing.T) {
arrowFields := make([]arrow.Field, 0)
parquetFields = append(parquetFields,
schema.Must(schema.NewPrimitiveNodeLogical("timestamp",
parquet.Repetitions.Required,
- schema.NewTimestampLogicalTypeForce(false,
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
+ schema.NewTimestampLogicalTypeForce(true,
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type:
&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "EST"}})
arrowSchema := arrow.NewSchema(arrowFields, nil)
@@ -323,11 +323,11 @@ func TestAutoCoerceTImestampV1(t *testing.T) {
arrowFields := make([]arrow.Field, 0)
parquetFields = append(parquetFields,
schema.Must(schema.NewPrimitiveNodeLogical("timestamp",
parquet.Repetitions.Required,
- schema.NewTimestampLogicalTypeForce(false,
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
+ schema.NewTimestampLogicalTypeForce(true,
schema.TimeUnitMicros), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp", Type:
&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "EST"}})
parquetFields = append(parquetFields,
schema.Must(schema.NewPrimitiveNodeLogical("timestamp[ms]",
parquet.Repetitions.Required,
- schema.NewTimestampLogicalTypeForce(true,
schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
+ schema.NewTimestampLogicalTypeForce(false,
schema.TimeUnitMillis), parquet.Types.Int64, 0, -1)))
arrowFields = append(arrowFields, arrow.Field{Name: "timestamp[ms]",
Type: &arrow.TimestampType{Unit: arrow.Second}})
arrowSchema := arrow.NewSchema(arrowFields, nil)