zeroshade commented on code in PR #1112:
URL: https://github.com/apache/iceberg-go/pull/1112#discussion_r3290531123
##########
table/partitioned_fanout_writer.go:
##########
@@ -51,6 +52,12 @@ type partitionInfo struct {
partitionRec partitionRecord // The actual partition values for
generating the path
}
+type partitionFieldInfo struct {
+ sourceField *iceberg.PartitionField
Review Comment:
PartitionField is a small struct, why use a pointer here instead of just
using it by value?
##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row
int) (iceberg.Literal
return nil, fmt.Errorf("unsupported value type: %T", val)
}
}
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType
iceberg.Type) (iceberg.Literal, error) {
+ timestampType := arr.DataType().(*arrow.TimestampType)
+ value := int64(arr.Value(row))
+
+ switch sourceType.(type) {
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+ case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
+ nanos, err := arrowTimestampToNanos(value, timestampType.Unit)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewLiteral(iceberg.TimestampNano(nanos)), nil
+ default:
+ return nil, fmt.Errorf("cannot convert arrow timestamp to
iceberg literal for source type %v", sourceType)
+ }
+}
+
+func arrowTimestampToMicros(value int64, unit arrow.TimeUnit) (int64, error) {
+ switch unit {
+ case arrow.Second:
+ return scaleTimestamp(value, 1_000_000)
+ case arrow.Millisecond:
+ return scaleTimestamp(value, 1_000)
+ case arrow.Microsecond:
+ return value, nil
+ case arrow.Nanosecond:
+ return floorDivInt64(value, 1_000), nil
+ default:
+ return 0, fmt.Errorf("unsupported arrow timestamp unit: %s",
unit)
+ }
+}
+
+func arrowTimestampToNanos(value int64, unit arrow.TimeUnit) (int64, error) {
+ switch unit {
+ case arrow.Second:
+ return scaleTimestamp(value, 1_000_000_000)
+ case arrow.Millisecond:
+ return scaleTimestamp(value, 1_000_000)
+ case arrow.Microsecond:
+ return scaleTimestamp(value, 1_000)
+ case arrow.Nanosecond:
+ return value, nil
+ default:
+ return 0, fmt.Errorf("unsupported arrow timestamp unit: %s",
unit)
+ }
+}
+
+func scaleTimestamp(value, factor int64) (int64, error) {
+ if (value > 0 && value > math.MaxInt64/factor) ||
+ (value < 0 && value < math.MinInt64/factor) {
+ return 0, fmt.Errorf("arrow timestamp value %d overflows int64
when scaled by %d", value, factor)
+ }
+
+ return value * factor, nil
+}
+
+func floorDivInt64(a, b int64) int64 {
+ d := a / b
+ if (a^b) < 0 && d*b != a {
+ d--
+ }
+
+ return d
+}
Review Comment:
this already exists in the root transforms.go file, we should probably just
move the version in transforms.go:579 into an internal/utils.go file and then
use that in both places rather than duplicate this function.
##########
table/partitioned_fanout_writer.go:
##########
@@ -214,28 +221,33 @@ func getRecordPartitions(spec iceberg.PartitionSpec,
schema *iceberg.Schema, rec
partitionRec := make(partitionRecord, len(partitionFields))
partitionColumns := make([]arrow.Array, len(partitionFields))
- partitionFieldsInfo := make([]struct {
- sourceField *iceberg.PartitionField
- fieldID int
- }, len(partitionFields))
+ partitionFieldsInfo := make([]partitionFieldInfo, len(partitionFields))
for i := range partitionFields {
sourceField := spec.Field(i)
- colName, _ := schema.FindColumnName(sourceField.SourceID())
- colIdx := record.Schema().FieldIndices(colName)[0]
- partitionColumns[i] = record.Column(colIdx)
- partitionFieldsInfo[i] = struct {
- sourceField *iceberg.PartitionField
- fieldID int
- }{&sourceField, sourceField.FieldID}
+ colName, ok := schema.FindColumnName(sourceField.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("failed to find source field ID
%d in schema", sourceField.SourceID())
+ }
+ colIndices := record.Schema().FieldIndices(colName)
+ if len(colIndices) == 0 {
+ return nil, fmt.Errorf("failed to find source column %q
in record schema", colName)
+ }
+ sourceType, ok := schema.FindTypeByID(sourceField.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("failed to find source field ID
%d in schema", sourceField.SourceID())
+ }
+ partitionColumns[i] = record.Column(colIndices[0])
+ partitionFieldsInfo[i] = partitionFieldInfo{&sourceField,
sourceField.FieldID, sourceType}
Review Comment:
```suggestion
partitionFieldsInfo[i] = partitionFieldInfo{
sourceField: &sourceField,
fieldID: sourceField.FieldID,
sourceType: sourceType,
}
```
just so we don't accidentally misorder things
##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row
int) (iceberg.Literal
return nil, fmt.Errorf("unsupported value type: %T", val)
}
}
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType
iceberg.Type) (iceberg.Literal, error) {
+ timestampType := arr.DataType().(*arrow.TimestampType)
+ value := int64(arr.Value(row))
+
+ switch sourceType.(type) {
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+ case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
+ nanos, err := arrowTimestampToNanos(value, timestampType.Unit)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewLiteral(iceberg.TimestampNano(nanos)), nil
+ default:
+ return nil, fmt.Errorf("cannot convert arrow timestamp to
iceberg literal for source type %v", sourceType)
+ }
+}
+
+func arrowTimestampToMicros(value int64, unit arrow.TimeUnit) (int64, error) {
+ switch unit {
+ case arrow.Second:
+ return scaleTimestamp(value, 1_000_000)
+ case arrow.Millisecond:
+ return scaleTimestamp(value, 1_000)
+ case arrow.Microsecond:
+ return value, nil
+ case arrow.Nanosecond:
+ return floorDivInt64(value, 1_000), nil
+ default:
+ return 0, fmt.Errorf("unsupported arrow timestamp unit: %s",
unit)
+ }
+}
+
+func arrowTimestampToNanos(value int64, unit arrow.TimeUnit) (int64, error) {
+ switch unit {
+ case arrow.Second:
+ return scaleTimestamp(value, 1_000_000_000)
+ case arrow.Millisecond:
+ return scaleTimestamp(value, 1_000_000)
+ case arrow.Microsecond:
+ return scaleTimestamp(value, 1_000)
+ case arrow.Nanosecond:
+ return value, nil
+ default:
+ return 0, fmt.Errorf("unsupported arrow timestamp unit: %s",
unit)
+ }
+}
+
+func scaleTimestamp(value, factor int64) (int64, error) {
+ if (value > 0 && value > math.MaxInt64/factor) ||
+ (value < 0 && value < math.MinInt64/factor) {
+ return 0, fmt.Errorf("arrow timestamp value %d overflows int64
when scaled by %d", value, factor)
+ }
Review Comment:
can you add a test that covers this? I don't think it's covered by the
current tests
##########
table/partitioned_fanout_writer.go:
##########
@@ -464,3 +472,75 @@ func getArrowValueAsIcebergLiteral(column arrow.Array, row
int) (iceberg.Literal
return nil, fmt.Errorf("unsupported value type: %T", val)
}
}
+
+func timestampLiteralFromArrow(arr *array.Timestamp, row int, sourceType
iceberg.Type) (iceberg.Literal, error) {
+ timestampType := arr.DataType().(*arrow.TimestampType)
+ value := int64(arr.Value(row))
+
+ switch sourceType.(type) {
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ micros, err := arrowTimestampToMicros(value, timestampType.Unit)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewLiteral(iceberg.Timestamp(micros)), nil
+ case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
Review Comment:
the Tz variants don't seem to get tested, can you add cases that have
`TimeZone: "UTC"` so we hit this case?
##########
table/partitioned_fanout_writer.go:
##########
@@ -214,28 +221,33 @@ func getRecordPartitions(spec iceberg.PartitionSpec,
schema *iceberg.Schema, rec
partitionRec := make(partitionRecord, len(partitionFields))
partitionColumns := make([]arrow.Array, len(partitionFields))
- partitionFieldsInfo := make([]struct {
- sourceField *iceberg.PartitionField
- fieldID int
- }, len(partitionFields))
+ partitionFieldsInfo := make([]partitionFieldInfo, len(partitionFields))
for i := range partitionFields {
sourceField := spec.Field(i)
- colName, _ := schema.FindColumnName(sourceField.SourceID())
- colIdx := record.Schema().FieldIndices(colName)[0]
- partitionColumns[i] = record.Column(colIdx)
- partitionFieldsInfo[i] = struct {
- sourceField *iceberg.PartitionField
- fieldID int
- }{&sourceField, sourceField.FieldID}
+ colName, ok := schema.FindColumnName(sourceField.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("failed to find source field ID
%d in schema", sourceField.SourceID())
+ }
+ colIndices := record.Schema().FieldIndices(colName)
+ if len(colIndices) == 0 {
+ return nil, fmt.Errorf("failed to find source column %q
in record schema", colName)
+ }
+ sourceType, ok := schema.FindTypeByID(sourceField.SourceID())
+ if !ok {
+ return nil, fmt.Errorf("failed to find source field ID
%d in schema", sourceField.SourceID())
Review Comment:
can we use something like "failed to find type for source field ID" to
distinguish this error from the above identical one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]