RustedBones commented on code in PR #33422:
URL: https://github.com/apache/beam/pull/33422#discussion_r1918600356
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -69,11 +71,15 @@ public class AvroGenericRecordToStorageApiProto {
// A map of supported logical types to the protobuf field type.
static final Map<String, TableFieldSchema.Type> LOGICAL_TYPES =
ImmutableMap.<String, TableFieldSchema.Type>builder()
- .put(LogicalTypes.date().getName(), TableFieldSchema.Type.DATE)
- .put(LogicalTypes.decimal(1).getName(),
TableFieldSchema.Type.BIGNUMERIC)
- .put(LogicalTypes.timestampMicros().getName(),
TableFieldSchema.Type.TIMESTAMP)
- .put(LogicalTypes.timestampMillis().getName(),
TableFieldSchema.Type.TIMESTAMP)
- .put(LogicalTypes.uuid().getName(), TableFieldSchema.Type.STRING)
+ .put("date", TableFieldSchema.Type.DATE)
Review Comment:
They are equivalent. I changed because type name for `decimal` is not
accessible and requires creation of a 'fake' logical-type.
```java
LogicalTypes.decimal(1).getName()
```
I can revert to the old style if that's prefered.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -213,7 +281,7 @@ public static DynamicMessage messageFromGenericRecord(
return builder.build();
}
- private static TableFieldSchema fieldDescriptorFromAvroField(Schema.Field
field) {
+ private static TableFieldSchema
fieldDescriptorFromAvroField(org.apache.avro.Schema.Field field) {
Review Comment:
I think I got asked by the compiler. Will revert
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java:
##########
@@ -115,34 +120,90 @@ static String convertUUID(Object value) {
}
static Long convertTimestamp(Object value, boolean micros) {
- if (value instanceof ReadableInstant) {
- return ((ReadableInstant) value).getMillis() * (micros ? 1000 : 1);
+ if (value instanceof org.joda.time.ReadableInstant) {
+ return ((org.joda.time.ReadableInstant) value).getMillis() * 1_000L;
+ } else if (value instanceof java.time.Instant) {
+ java.time.Instant instant = (java.time.Instant) value;
+ long seconds = instant.getEpochSecond();
+ int nanos = instant.getNano();
+
+ if (seconds < 0 && nanos > 0) {
+ long ms = Math.multiplyExact(seconds + 1, 1_000_000L);
+ long adjustment = (nanos / 1_000L) - 1_000_000L;
+ return Math.addExact(ms, adjustment);
+ } else {
+ long ms = Math.multiplyExact(seconds, 1_000_000L);
+ return Math.addExact(ms, nanos / 1_000L);
+ }
} else {
Preconditions.checkArgument(
- value instanceof Long, "Expecting a value as Long type (millis).");
- return (Long) value;
+ value instanceof Long, "Expecting a value as Long type
(timestamp).");
+ return ((Long) value) * (micros ? 1 : 1_000L);
}
}
static Integer convertDate(Object value) {
- if (value instanceof ReadableInstant) {
- return Days.daysBetween(Instant.EPOCH, (ReadableInstant)
value).getDays();
+ if (value instanceof org.joda.time.LocalDate) {
+ return org.joda.time.Days.daysBetween(EPOCH_DATE,
(org.joda.time.LocalDate) value).getDays();
+ } else if (value instanceof java.time.LocalDate) {
+ return (int) ((java.time.LocalDate) value).toEpochDay();
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type
(days).");
return (Integer) value;
}
}
+ static Long convertTime(Object value, boolean micros) {
+ if (value instanceof org.joda.time.LocalTime) {
+ return 1_000L * (long) ((org.joda.time.LocalTime)
value).getMillisOfDay();
+ } else if (value instanceof java.time.LocalTime) {
+ return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
+ ((java.time.LocalTime) value).toNanoOfDay());
+ } else {
+ Preconditions.checkArgument(value instanceof Long, "Expecting a value as
Long type (time).");
+ return (Long) value * (micros ? 1 : 1_000L);
+ }
+ }
+
+ static Long convertDateTime(Object value, boolean micros) {
+ if (value instanceof org.joda.time.LocalDateTime) {
+ // we should never come here as local-timestamp has been added after
joda deprecation
+ // implement nonetheless for consistency
+ org.joda.time.DateTime dateTime =
+ ((org.joda.time.LocalDateTime)
value).toDateTime(org.joda.time.DateTimeZone.UTC);
+ return 1_000L * dateTime.getMillis();
+ } else if (value instanceof java.time.LocalDateTime) {
+ java.time.Instant instant =
+ ((java.time.LocalDateTime)
value).toInstant(java.time.ZoneOffset.UTC);
+ return convertTimestamp(instant, micros);
+ } else {
+ Preconditions.checkArgument(
+ value instanceof Long, "Expecting a value as Long type
(local-timestamp).");
+ return ((Long) value) * (micros ? 1 : 1_000L);
+ }
+ }
+
static ByteString convertDecimal(LogicalType logicalType, Object value) {
- ByteBuffer byteBuffer = (ByteBuffer) value;
- BigDecimal bigDecimal =
- new Conversions.DecimalConversion()
- .fromBytes(
- byteBuffer.duplicate(),
- Schema.create(Schema.Type.NULL), // dummy schema, not used
- logicalType);
- return BeamRowToStorageApiProto.serializeBigDecimalToNumeric(bigDecimal);
+ if (value instanceof BigDecimal) {
+ LogicalTypes.Decimal type = (LogicalTypes.Decimal) logicalType;
+ BigDecimal bigDecimal =
+ ((BigDecimal) value)
+ .setScale(type.getScale(), RoundingMode.DOWN)
+ .round(new MathContext(type.getPrecision(), RoundingMode.DOWN));
Review Comment:
All other conversions accept the logical-type as well as the underlying-type.
This is mandatory to support both: even if the logical-type is present on
the schema, it can be discarded when the `GenericData` used for serialization
does not have the feature enabled.
Concerning the rounding, The
[doc](https://cloud.google.com/bigquery/docs/write-api#data_type_conversions)
states to use
[BigDecimalByteStringEncoder](https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java).
`BeamRowToStorageApiProto` is a copy of that.
It's however not supporting parameterized `NUMERIC` and `BIGNUMERIC` types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]