This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6aec5d12953 Bugfix for Timestamp Overflow in BigQueryIO's
StorageWriteAPI (#27413)
6aec5d12953 is described below
commit 6aec5d12953b209856b638c208b30425876279ea
Author: RyuSA <[email protected]>
AuthorDate: Fri Jul 21 23:33:37 2023 +0900
Bugfix for Timestamp Overflow in BigQueryIO's StorageWriteAPI (#27413)
* Add a new unit test for BigQueryIO.
BigQuery must accept a timestamp until 9999-12-31 23:59:59.
* Fix overflow logic (#27413)
* fix checkstyle
remove unused imports
* apply checkstyle
* apply checkstyle
* simplify the unittest
remove testTimestampTypeConversion, add a new test case
---
.../io/gcp/bigquery/TableRowToStorageApiProto.java | 22 ++++++++++++--------
.../bigquery/TableRowToStorageApiProtoTest.java | 24 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index b8dbb9703c2..d231d84aea2 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -48,7 +48,6 @@ import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
-import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.List;
@@ -825,23 +824,23 @@ public class TableRowToStorageApiProto {
try {
// '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00'
// '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30
America/New_York'
- return ChronoUnit.MICROS.between(
- Instant.EPOCH, Instant.from(TIMESTAMP_FORMATTER.parse((String)
value)));
+ Instant timestamp =
Instant.from(TIMESTAMP_FORMATTER.parse((String) value));
+ return toEpochMicros(timestamp);
} catch (DateTimeException e) {
try {
// for backwards compatibility, default time zone is UTC for
values with no time-zone
// '2011-12-03T10:15:30'
- return ChronoUnit.MICROS.between(
- Instant.EPOCH,
-
Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String)
value)));
+ Instant timestamp =
+
Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String)
value));
+ return toEpochMicros(timestamp);
} catch (DateTimeParseException err) {
// "12345667"
- return ChronoUnit.MICROS.between(
- Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String)
value)));
+ Instant timestamp = Instant.ofEpochMilli(Long.parseLong((String)
value));
+ return toEpochMicros(timestamp);
}
}
} else if (value instanceof Instant) {
- return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+ return toEpochMicros((Instant) value);
} else if (value instanceof org.joda.time.Instant) {
// joda instant precision is millisecond
return ((org.joda.time.Instant) value).getMillis() * 1000L;
@@ -972,6 +971,11 @@ public class TableRowToStorageApiProto {
+ schemaInformation.getType());
}
+ private static long toEpochMicros(Instant timestamp) {
+ // i.e 1970-01-01T00:01:01.000040Z: 61 * 1000_000L + 40000/1000 = 61000040
+ return timestamp.getEpochSecond() * 1000_000L + timestamp.getNano() / 1000;
+ }
+
@VisibleForTesting
public static TableRow tableRowFromMessage(Message message, boolean
includeCdcColumns) {
// TODO: Would be more correct to generate TableRows using setF.
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 58f181700d7..90be99fce84 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -114,6 +114,7 @@ public class TableRowToStorageApiProtoTest {
.setType("TIMESTAMP")
.setName("timestampValueSpaceTrailingZero"))
.add(new
TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace"))
+ .add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
.build());
private static final TableSchema BASE_TABLE_SCHEMA_NO_F =
@@ -163,6 +164,7 @@ public class TableRowToStorageApiProtoTest {
.setType("TIMESTAMP")
.setName("timestampValueSpaceTrailingZero"))
.add(new
TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace"))
+ .add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
.build());
private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
@@ -356,6 +358,13 @@ public class TableRowToStorageApiProtoTest {
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestampvaluemaximum")
+ .setNumber(28)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
.build();
private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -542,6 +551,13 @@ public class TableRowToStorageApiProtoTest {
.setType(Type.TYPE_INT64)
.setLabel(Label.LABEL_OPTIONAL)
.build())
+ .addField(
+ FieldDescriptorProto.newBuilder()
+ .setName("timestampvaluemaximum")
+ .setNumber(27)
+ .setType(Type.TYPE_INT64)
+ .setLabel(Label.LABEL_OPTIONAL)
+ .build())
.build();
private static final TableSchema NESTED_TABLE_SCHEMA =
new TableSchema()
@@ -689,7 +705,8 @@ public class TableRowToStorageApiProtoTest {
new TableCell().setV("1970-01-01 00:00:00.123456
America/New_York"),
new TableCell().setV("1970-01-01 00:00:00.123"),
new TableCell().setV("1970-01-01 00:00:00.1230"),
- new TableCell().setV("2019-08-16 00:52:07.123456")));
+ new TableCell().setV("2019-08-16 00:52:07.123456"),
+ new TableCell().setV("9999-12-31 23:59:59.999999Z")));
private static final TableRow BASE_TABLE_ROW_NO_F =
new TableRow()
@@ -721,7 +738,8 @@ public class TableRowToStorageApiProtoTest {
.set("timestampValueZoneRegion", "1970-01-01 00:00:00.123456
America/New_York")
.set("timestampValueSpaceMilli", "1970-01-01 00:00:00.123")
.set("timestampValueSpaceTrailingZero", "1970-01-01 00:00:00.1230")
- .set("datetimeValueSpace", "2019-08-16 00:52:07.123456");
+ .set("datetimeValueSpace", "2019-08-16 00:52:07.123456")
+ .set("timestampValueMaximum", "9999-12-31 23:59:59.999999Z");
private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
ImmutableMap.<String, Object>builder()
@@ -761,6 +779,7 @@ public class TableRowToStorageApiProtoTest {
.put("timestampvaluespacemilli", 123000L)
.put("timestampvaluespacetrailingzero", 123000L)
.put("datetimevaluespace", 142111881387172416L)
+ .put("timestampvaluemaximum", 253402300799999999L)
.build();
private static final Map<String, Object> BASE_ROW_NO_F_EXPECTED_PROTO_VALUES
=
@@ -800,6 +819,7 @@ public class TableRowToStorageApiProtoTest {
.put("timestampvaluespacemilli", 123000L)
.put("timestampvaluespacetrailingzero", 123000L)
.put("datetimevaluespace", 142111881387172416L)
+ .put("timestampvaluemaximum", 253402300799999999L)
.build();
private void assertBaseRecord(DynamicMessage msg, boolean withF) {