DanielMorales9 commented on code in PR #32688:
URL: https://github.com/apache/beam/pull/32688#discussion_r1795164130


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java:
##########
@@ -322,6 +354,55 @@ private static void copyFieldIntoRecord(Record rec, 
Types.NestedField field, Row
     }
   }
 
+  /**
+   * Returns the appropriate value for an Iceberg timestamp field
+   *
+   * <p>If `timestamp`, we resolve incoming values to a {@link LocalDateTime}.
+   *
+   * <p>If `timestamptz`, we resolve to a UTC {@link OffsetDateTime}. Iceberg 
already resolves all
+   * incoming timestamps to UTC, so there is no harm in doing it from our side.
+   *
+   * <p>Valid types are:
+   *
+   * <ul>
+   *   <li>{@link SqlTypes.DATETIME} --> {@link LocalDateTime}
+   *   <li>{@link Schema.FieldType.DATETIME} --> {@link Instant}
+   *   <li>{@link Schema.FieldType.INT64} --> {@link Long}
+   *   <li>{@link Schema.FieldType.STRING} --> {@link String}
+   * </ul>
+   */
+  private static Object getIcebergTimestampValue(Object beamValue, boolean 
shouldAdjustToUtc) {

Review Comment:
   Imo, method overloading makes the code cleaner and easier to follow and it 
avoids type-checking blocks (i.e., instanceof), reducing complexity. 
   
    ```
   private static Object getIcebergTimestampValue(Object beamValue, boolean 
shouldAdjustToUtc) {
       if (shouldAdjustToUtc) {
           return convertBeamTimestampToUTC(beamValue);
       } else {
           return convertBeamTimestampWithoutUTC(beamValue);
       }
   }
   
   // Overloaded methods for different types for UTC conversion
   private static Object convertBeamTimestampToUTC(LocalDateTime beamValue) {
       return OffsetDateTime.of(beamValue, ZoneOffset.UTC);
   }
   
   private static Object convertBeamTimestampToUTC(Instant beamValue) {
       return DateTimeUtil.timestamptzFromMicros(beamValue.toEpochMilli() * 
1000L);
   }
   
   private static Object convertBeamTimestampToUTC(Long beamValue) {
       return DateTimeUtil.timestamptzFromMicros(beamValue);
   }
   
   private static Object convertBeamTimestampToUTC(String beamValue) {
       return 
OffsetDateTime.parse(beamValue).withOffsetSameInstant(ZoneOffset.UTC);
   }
   
   // Overloaded methods for different types for non-UTC conversion
   private static Object convertBeamTimestampWithoutUTC(LocalDateTime 
beamValue) {
       return beamValue;
   }
   
   private static Object convertBeamTimestampWithoutUTC(Instant beamValue) {
       return DateTimeUtil.timestampFromMicros(beamValue.toEpochMilli() * 
1000L);
   }
   
   private static Object convertBeamTimestampWithoutUTC(Long beamValue) {
       return DateTimeUtil.timestampFromMicros(beamValue);
   }
   
   private static Object convertBeamTimestampWithoutUTC(String beamValue) {
       return LocalDateTime.parse(beamValue);
   }
   
   // Fallback method for unsupported types
   private static Object convertBeamTimestampToUTC(Object beamValue) {
       throw unsupportedBeamTimestamp(beamValue);
   }
   
   private static Object convertBeamTimestampWithoutUTC(Object beamValue) {
       throw unsupportedBeamTimestamp(beamValue);
   }
   
   private static UnsupportedOperationException 
unsupportedBeamTimestampException(Object beamValue) {
       return new UnsupportedOperationException(
           "Unsupported Beam type for Iceberg timestamp: " + 
beamValue.getClass().getName());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to