DanielMorales9 commented on code in PR #32688:
URL: https://github.com/apache/beam/pull/32688#discussion_r1795164130
##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -322,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec,
Types.NestedField field, Row
}
}
+ /**
+ * Returns the appropriate value for an Iceberg timestamp field
+ *
+ * <p>If `timestamp`, we resolve incoming values to a {@link LocalDateTime}.
+ *
+ * <p>If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg
already resolves all
+ * incoming timestamps to UTC, so there is no harm in doing it from our side.
+ *
+ * <p>Valid types are:
+ *
+ * <ul>
+ * <li>{@link SqlTypes.DATETIME} --> {@link LocalDateTime}
+ * <li>{@link Schema.FieldType.DATETIME} --> {@link Instant}
+ * <li>{@link Schema.FieldType.INT64} --> {@link Long}
+ * <li>{@link Schema.FieldType.STRING} --> {@link String}
+ * </ul>
+ */
+ private static Object getIcebergTimestampValue(Object beamValue, boolean
shouldAdjustToUtc) {
Review Comment:
Imo, method overloading makes the code cleaner and easier to follow and it
avoids type-checking blocks (i.e., instanceof), reducing complexity.
```
private static Object getIcebergTimestampValue(Object beamValue, boolean
shouldAdjustToUtc) {
if (shouldAdjustToUtc) {
return convertBeamTimestampToUTC(beamValue);
} else {
return convertBeamTimestampWithoutUTC(beamValue);
}
}
// Overloaded methods for different types for UTC conversion
private static Object convertBeamTimestampToUTC(LocalDateTime beamValue) {
return OffsetDateTime.of(beamValue, ZoneOffset.UTC);
}
private static Object convertBeamTimestampToUTC(Instant beamValue) {
return DateTimeUtil.timestamptzFromMicros(beamValue.toEpochMilli() *
1000L);
}
private static Object convertBeamTimestampToUTC(Long beamValue) {
return DateTimeUtil.timestamptzFromMicros(beamValue);
}
private static Object convertBeamTimestampToUTC(String beamValue) {
return
OffsetDateTime.parse(beamValue).withOffsetSameInstant(ZoneOffset.UTC);
}
// Overloaded methods for different types for non-UTC conversion
private static Object convertBeamTimestampWithoutUTC(LocalDateTime
beamValue) {
return beamValue;
}
private static Object convertBeamTimestampWithoutUTC(Instant beamValue) {
return DateTimeUtil.timestampFromMicros(beamValue.toEpochMilli() *
1000L);
}
private static Object convertBeamTimestampWithoutUTC(Long beamValue) {
return DateTimeUtil.timestampFromMicros(beamValue);
}
private static Object convertBeamTimestampWithoutUTC(String beamValue) {
return LocalDateTime.parse(beamValue);
}
// Fallback method for unsupported types
private static Object convertBeamTimestampToUTC(Object beamValue) {
throw unsupportedBeamTimestamp(beamValue);
}
private static Object convertBeamTimestampWithoutUTC(Object beamValue) {
throw unsupportedBeamTimestamp(beamValue);
}
private static UnsupportedOperationException
unsupportedBeamTimestampException(Object beamValue) {
return new UnsupportedOperationException(
"Unsupported Beam type for Iceberg timestamp: " +
beamValue.getClass().getName());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]