This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aec5d12953 Bugfix for Timestamp Overflow in BigQueryIO's 
StorageWriteAPI (#27413)
6aec5d12953 is described below

commit 6aec5d12953b209856b638c208b30425876279ea
Author: RyuSA <[email protected]>
AuthorDate: Fri Jul 21 23:33:37 2023 +0900

    Bugfix for Timestamp Overflow in BigQueryIO's StorageWriteAPI (#27413)
    
    * Add a new unit test for BigQueryIO.
    
    BigQuery must accept a timestamp until 9999-12-31 23:59:59.
    
    * Fix overflow logic (#27413)
    
    * fix checkstyle
    
    remove unused imports
    
    * apply checkstyle
    
    * apply checkstyle
    
    * simplify the unittest
    
    remove testTimestampTypeConversion, add a new test case
---
 .../io/gcp/bigquery/TableRowToStorageApiProto.java | 22 ++++++++++++--------
 .../bigquery/TableRowToStorageApiProtoTest.java    | 24 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 11 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
index b8dbb9703c2..d231d84aea2 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java
@@ -48,7 +48,6 @@ import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.time.format.DateTimeFormatterBuilder;
 import java.time.format.DateTimeParseException;
-import java.time.temporal.ChronoUnit;
 import java.util.AbstractMap;
 import java.util.Collections;
 import java.util.List;
@@ -825,23 +824,23 @@ public class TableRowToStorageApiProto {
           try {
             // '2011-12-03T10:15:30Z', '2011-12-03 10:15:30+05:00'
             // '2011-12-03 10:15:30 UTC', '2011-12-03T10:15:30 
America/New_York'
-            return ChronoUnit.MICROS.between(
-                Instant.EPOCH, Instant.from(TIMESTAMP_FORMATTER.parse((String) 
value)));
+            Instant timestamp = 
Instant.from(TIMESTAMP_FORMATTER.parse((String) value));
+            return toEpochMicros(timestamp);
           } catch (DateTimeException e) {
             try {
               // for backwards compatibility, default time zone is UTC for 
values with no time-zone
               // '2011-12-03T10:15:30'
-              return ChronoUnit.MICROS.between(
-                  Instant.EPOCH,
-                  
Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String) 
value)));
+              Instant timestamp =
+                  
Instant.from(TIMESTAMP_FORMATTER.withZone(ZoneOffset.UTC).parse((String) 
value));
+              return toEpochMicros(timestamp);
             } catch (DateTimeParseException err) {
               // "12345667"
-              return ChronoUnit.MICROS.between(
-                  Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String) 
value)));
+              Instant timestamp = Instant.ofEpochMilli(Long.parseLong((String) 
value));
+              return toEpochMicros(timestamp);
             }
           }
         } else if (value instanceof Instant) {
-          return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+          return toEpochMicros((Instant) value);
         } else if (value instanceof org.joda.time.Instant) {
           // joda instant precision is millisecond
           return ((org.joda.time.Instant) value).getMillis() * 1000L;
@@ -972,6 +971,11 @@ public class TableRowToStorageApiProto {
             + schemaInformation.getType());
   }
 
+  private static long toEpochMicros(Instant timestamp) {
+    // i.e 1970-01-01T00:01:01.000040Z: 61 * 1000_000L + 40000/1000 = 61000040
+    return timestamp.getEpochSecond() * 1000_000L + timestamp.getNano() / 1000;
+  }
+
   @VisibleForTesting
   public static TableRow tableRowFromMessage(Message message, boolean 
includeCdcColumns) {
     // TODO: Would be more correct to generate TableRows using setF.
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
index 58f181700d7..90be99fce84 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoTest.java
@@ -114,6 +114,7 @@ public class TableRowToStorageApiProtoTest {
                           .setType("TIMESTAMP")
                           .setName("timestampValueSpaceTrailingZero"))
                   .add(new 
TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace"))
+                  .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
                   .build());
 
   private static final TableSchema BASE_TABLE_SCHEMA_NO_F =
@@ -163,6 +164,7 @@ public class TableRowToStorageApiProtoTest {
                           .setType("TIMESTAMP")
                           .setName("timestampValueSpaceTrailingZero"))
                   .add(new 
TableFieldSchema().setType("DATETIME").setName("datetimeValueSpace"))
+                  .add(new 
TableFieldSchema().setType("TIMESTAMP").setName("timestampValueMaximum"))
                   .build());
 
   private static final DescriptorProto BASE_TABLE_SCHEMA_PROTO =
@@ -356,6 +358,13 @@ public class TableRowToStorageApiProtoTest {
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvaluemaximum")
+                  .setNumber(28)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
           .build();
 
   private static final DescriptorProto BASE_TABLE_SCHEMA_NO_F_PROTO =
@@ -542,6 +551,13 @@ public class TableRowToStorageApiProtoTest {
                   .setType(Type.TYPE_INT64)
                   .setLabel(Label.LABEL_OPTIONAL)
                   .build())
+          .addField(
+              FieldDescriptorProto.newBuilder()
+                  .setName("timestampvaluemaximum")
+                  .setNumber(27)
+                  .setType(Type.TYPE_INT64)
+                  .setLabel(Label.LABEL_OPTIONAL)
+                  .build())
           .build();
   private static final TableSchema NESTED_TABLE_SCHEMA =
       new TableSchema()
@@ -689,7 +705,8 @@ public class TableRowToStorageApiProtoTest {
                   new TableCell().setV("1970-01-01 00:00:00.123456 
America/New_York"),
                   new TableCell().setV("1970-01-01 00:00:00.123"),
                   new TableCell().setV("1970-01-01 00:00:00.1230"),
-                  new TableCell().setV("2019-08-16 00:52:07.123456")));
+                  new TableCell().setV("2019-08-16 00:52:07.123456"),
+                  new TableCell().setV("9999-12-31 23:59:59.999999Z")));
 
   private static final TableRow BASE_TABLE_ROW_NO_F =
       new TableRow()
@@ -721,7 +738,8 @@ public class TableRowToStorageApiProtoTest {
           .set("timestampValueZoneRegion", "1970-01-01 00:00:00.123456 
America/New_York")
           .set("timestampValueSpaceMilli", "1970-01-01 00:00:00.123")
           .set("timestampValueSpaceTrailingZero", "1970-01-01 00:00:00.1230")
-          .set("datetimeValueSpace", "2019-08-16 00:52:07.123456");
+          .set("datetimeValueSpace", "2019-08-16 00:52:07.123456")
+          .set("timestampValueMaximum", "9999-12-31 23:59:59.999999Z");
 
   private static final Map<String, Object> BASE_ROW_EXPECTED_PROTO_VALUES =
       ImmutableMap.<String, Object>builder()
@@ -761,6 +779,7 @@ public class TableRowToStorageApiProtoTest {
           .put("timestampvaluespacemilli", 123000L)
           .put("timestampvaluespacetrailingzero", 123000L)
           .put("datetimevaluespace", 142111881387172416L)
+          .put("timestampvaluemaximum", 253402300799999999L)
           .build();
 
   private static final Map<String, Object> BASE_ROW_NO_F_EXPECTED_PROTO_VALUES 
=
@@ -800,6 +819,7 @@ public class TableRowToStorageApiProtoTest {
           .put("timestampvaluespacemilli", 123000L)
           .put("timestampvaluespacetrailingzero", 123000L)
           .put("datetimevaluespace", 142111881387172416L)
+          .put("timestampvaluemaximum", 253402300799999999L)
           .build();
 
   private void assertBaseRecord(DynamicMessage msg, boolean withF) {

Reply via email to