This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d94e348  [BEAM-7999] Fix BigQuery timestamp handling for Schema Aware 
PCollection
     new 5a62f94  Merge pull request #9369 from 
alexvanboxel/feature/BEAM-7999-bq-timestamp
d94e348 is described below

commit d94e3485da50b9dba09b77928d61c9f9ebad7597
Author: Alex Van Boxel <[email protected]>
AuthorDate: Sat Aug 17 15:17:35 2019 +0200

    [BEAM-7999] Fix BigQuery timestamp handling for Schema Aware PCollection
    
    This fixes the handling of timestamps when reading for BigQuery. As it
    was assumed BigQuery always outputs a Double, reading a table directly
    outputs a textual version in the JSON. This is now handled. Also the
    from Beam Row also outputs the BigQuery textual version, iso the ISO
    standard representation, this makes the implementation symmetric.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryUtils.java    |  68 ++++++++--
 .../sdk/io/gcp/bigquery/BigQueryUtilsTest.java     | 140 ++++++++++++++++++---
 2 files changed, 183 insertions(+), 25 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 011f0c4..04a92ec 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -49,6 +49,7 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
 import org.joda.time.chrono.ISOChronology;
@@ -90,6 +91,51 @@ public class BigQueryUtils {
     }
   }
 
+  private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;
+
+  /**
+   * Native BigQuery formatter for it's timestamp format, depending on the 
milliseconds stored in
+   * the column, the milli second part will be 6, 3 or absent. Example {@code 
2019-08-16
+   * 00:52:07[.123]|[.123456] UTC}
+   */
+  private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PARSER;
+
+  static {
+    DateTimeFormatter dateTimePart =
+        new DateTimeFormatterBuilder()
+            .appendYear(4, 4)
+            .appendLiteral('-')
+            .appendMonthOfYear(2)
+            .appendLiteral('-')
+            .appendDayOfMonth(2)
+            .appendLiteral(' ')
+            .appendHourOfDay(2)
+            .appendLiteral(':')
+            .appendMinuteOfHour(2)
+            .appendLiteral(':')
+            .appendSecondOfMinute(2)
+            .toFormatter()
+            .withZoneUTC();
+    BIGQUERY_TIMESTAMP_PARSER =
+        new DateTimeFormatterBuilder()
+            .append(dateTimePart)
+            .appendOptional(
+                new DateTimeFormatterBuilder()
+                    .appendLiteral('.')
+                    .appendFractionOfSecond(3, 6)
+                    .toParser())
+            .appendLiteral(" UTC")
+            .toFormatter()
+            .withZoneUTC();
+    BIGQUERY_TIMESTAMP_PRINTER =
+        new DateTimeFormatterBuilder()
+            .append(dateTimePart)
+            .appendLiteral('.')
+            .appendFractionOfSecond(3, 3)
+            .appendLiteral(" UTC")
+            .toFormatter();
+  }
+
   private static final Map<TypeName, StandardSQLTypeName> 
BEAM_TO_BIGQUERY_TYPE_MAPPING =
       ImmutableMap.<TypeName, StandardSQLTypeName>builder()
           .put(TypeName.BYTE, StandardSQLTypeName.INT64)
@@ -120,9 +166,17 @@ public class BigQueryUtils {
           .put(TypeName.STRING, str -> str)
           .put(
               TypeName.DATETIME,
-              str ->
-                  new DateTime(
-                      (long) (Double.parseDouble(str) * 1000), 
ISOChronology.getInstanceUTC()))
+              str -> {
+                if (str == null || str.length() == 0) {
+                  return null;
+                }
+                if (str.endsWith("UTC")) {
+                  return 
BIGQUERY_TIMESTAMP_PARSER.parseDateTime(str).toDateTime(DateTimeZone.UTC);
+                } else {
+                  return new DateTime(
+                      (long) (Double.parseDouble(str) * 1000), 
ISOChronology.getInstanceUTC());
+                }
+              })
           .put(TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
           .build();
 
@@ -367,11 +421,9 @@ public class BigQueryUtils {
         return toTableRow((Row) fieldValue);
 
       case DATETIME:
-        DateTimeFormatter patternFormat =
-            new DateTimeFormatterBuilder()
-                .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
-                .toFormatter();
-        return ((Instant) fieldValue).toDateTime().toString(patternFormat);
+        return ((Instant) fieldValue)
+            .toDateTime(DateTimeZone.UTC)
+            .toString(BIGQUERY_TIMESTAMP_PRINTER);
 
       case INT16:
       case INT32:
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 67d997e..a215635 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.joda.time.chrono.ISOChronology;
+import org.joda.time.format.ISODateTimeFormat;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -55,7 +56,10 @@ public class BigQueryUtilsTest {
           .addNullableField("id", Schema.FieldType.INT64)
           .addNullableField("value", Schema.FieldType.DOUBLE)
           .addNullableField("name", Schema.FieldType.STRING)
-          .addNullableField("timestamp", Schema.FieldType.DATETIME)
+          .addNullableField("timestamp_variant1", Schema.FieldType.DATETIME)
+          .addNullableField("timestamp_variant2", Schema.FieldType.DATETIME)
+          .addNullableField("timestamp_variant3", Schema.FieldType.DATETIME)
+          .addNullableField("timestamp_variant4", Schema.FieldType.DATETIME)
           .addNullableField("valid", Schema.FieldType.BOOLEAN)
           .addNullableField("binary", Schema.FieldType.BYTES)
           .build();
@@ -78,8 +82,22 @@ public class BigQueryUtilsTest {
   private static final TableFieldSchema NAME =
       new 
TableFieldSchema().setName("name").setType(StandardSQLTypeName.STRING.toString());
 
-  private static final TableFieldSchema TIMESTAMP =
-      new 
TableFieldSchema().setName("timestamp").setType(StandardSQLTypeName.TIMESTAMP.toString());
+  private static final TableFieldSchema TIMESTAMP_VARIANT1 =
+      new TableFieldSchema()
+          .setName("timestamp_variant1")
+          .setType(StandardSQLTypeName.TIMESTAMP.toString());
+  private static final TableFieldSchema TIMESTAMP_VARIANT2 =
+      new TableFieldSchema()
+          .setName("timestamp_variant2")
+          .setType(StandardSQLTypeName.TIMESTAMP.toString());
+  private static final TableFieldSchema TIMESTAMP_VARIANT3 =
+      new TableFieldSchema()
+          .setName("timestamp_variant3")
+          .setType(StandardSQLTypeName.TIMESTAMP.toString());
+  private static final TableFieldSchema TIMESTAMP_VARIANT4 =
+      new TableFieldSchema()
+          .setName("timestamp_variant4")
+          .setType(StandardSQLTypeName.TIMESTAMP.toString());
 
   private static final TableFieldSchema VALID =
       new 
TableFieldSchema().setName("valid").setType(StandardSQLTypeName.BOOL.toString());
@@ -98,14 +116,34 @@ public class BigQueryUtilsTest {
           .setName("row")
           .setType(StandardSQLTypeName.STRUCT.toString())
           .setMode(Mode.NULLABLE.toString())
-          .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID, BINARY));
+          .setFields(
+              Arrays.asList(
+                  ID,
+                  VALUE,
+                  NAME,
+                  TIMESTAMP_VARIANT1,
+                  TIMESTAMP_VARIANT2,
+                  TIMESTAMP_VARIANT3,
+                  TIMESTAMP_VARIANT4,
+                  VALID,
+                  BINARY));
 
   private static final TableFieldSchema ROWS =
       new TableFieldSchema()
           .setName("rows")
           .setType(StandardSQLTypeName.STRUCT.toString())
           .setMode(Mode.REPEATED.toString())
-          .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID, BINARY));
+          .setFields(
+              Arrays.asList(
+                  ID,
+                  VALUE,
+                  NAME,
+                  TIMESTAMP_VARIANT1,
+                  TIMESTAMP_VARIANT2,
+                  TIMESTAMP_VARIANT3,
+                  TIMESTAMP_VARIANT4,
+                  VALID,
+                  BINARY));
 
   // Make sure that chosen BYTES test value is the same after a full base64 
round trip.
   private static final Row FLAT_ROW =
@@ -114,6 +152,15 @@ public class BigQueryUtilsTest {
               123L,
               123.456,
               "test",
+              ISODateTimeFormat.dateHourMinuteSecondFraction()
+                  .withZoneUTC()
+                  .parseDateTime("2019-08-16T13:52:07.000"),
+              ISODateTimeFormat.dateHourMinuteSecondFraction()
+                  .withZoneUTC()
+                  .parseDateTime("2019-08-17T14:52:07.123"),
+              ISODateTimeFormat.dateHourMinuteSecondFraction()
+                  .withZoneUTC()
+                  .parseDateTime("2019-08-18T15:52:07.123"),
               new DateTime(123456),
               false,
               Base64.getDecoder().decode("ABCD1234"))
@@ -124,22 +171,31 @@ public class BigQueryUtilsTest {
           .set("id", "123")
           .set("value", "123.456")
           .set("name", "test")
+          .set("timestamp_variant1", "2019-08-16 13:52:07 UTC")
+          .set("timestamp_variant2", "2019-08-17 14:52:07.123 UTC")
+          // we'll loose precession, but it's something BigQuery can output!
+          .set("timestamp_variant3", "2019-08-18 15:52:07.123456 UTC")
           .set(
-              "timestamp",
+              "timestamp_variant4",
               String.valueOf(
                   new DateTime(123456L, 
ISOChronology.getInstanceUTC()).getMillis() / 1000.0D))
           .set("valid", "false")
           .set("binary", "ABCD1234");
 
   private static final Row NULL_FLAT_ROW =
-      Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, null, 
null).build();
+      Row.withSchema(FLAT_TYPE)
+          .addValues(null, null, null, null, null, null, null, null, null)
+          .build();
 
   private static final TableRow BQ_NULL_FLAT_ROW =
       new TableRow()
           .set("id", null)
           .set("value", null)
           .set("name", null)
-          .set("timestamp", null)
+          .set("timestamp_variant1", null)
+          .set("timestamp_variant2", null)
+          .set("timestamp_variant3", null)
+          .set("timestamp_variant4", null)
           .set("valid", null)
           .set("binary", null);
 
@@ -165,7 +221,18 @@ public class BigQueryUtilsTest {
           .set("rows", Collections.singletonList(Collections.singletonMap("v", 
BQ_FLAT_ROW)));
 
   private static final TableSchema BQ_FLAT_TYPE =
-      new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, 
VALID, BINARY));
+      new TableSchema()
+          .setFields(
+              Arrays.asList(
+                  ID,
+                  VALUE,
+                  NAME,
+                  TIMESTAMP_VARIANT1,
+                  TIMESTAMP_VARIANT2,
+                  TIMESTAMP_VARIANT3,
+                  TIMESTAMP_VARIANT4,
+                  VALID,
+                  BINARY));
 
   private static final TableSchema BQ_ARRAY_TYPE = new 
TableSchema().setFields(Arrays.asList(IDS));
 
@@ -192,7 +259,18 @@ public class BigQueryUtilsTest {
   public void testToTableSchema_flat() {
     TableSchema schema = toTableSchema(FLAT_TYPE);
 
-    assertThat(schema.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID, BINARY));
+    assertThat(
+        schema.getFields(),
+        containsInAnyOrder(
+            ID,
+            VALUE,
+            NAME,
+            TIMESTAMP_VARIANT1,
+            TIMESTAMP_VARIANT2,
+            TIMESTAMP_VARIANT3,
+            TIMESTAMP_VARIANT4,
+            VALID,
+            BINARY));
   }
 
   @Test
@@ -211,7 +289,18 @@ public class BigQueryUtilsTest {
     assertThat(field.getName(), equalTo("row"));
     assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
     assertThat(field.getMode(), nullValue());
-    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID, BINARY));
+    assertThat(
+        field.getFields(),
+        containsInAnyOrder(
+            ID,
+            VALUE,
+            NAME,
+            TIMESTAMP_VARIANT1,
+            TIMESTAMP_VARIANT2,
+            TIMESTAMP_VARIANT3,
+            TIMESTAMP_VARIANT4,
+            VALID,
+            BINARY));
   }
 
   @Test
@@ -223,7 +312,18 @@ public class BigQueryUtilsTest {
     assertThat(field.getName(), equalTo("rows"));
     assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
     assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
-    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID, BINARY));
+    assertThat(
+        field.getFields(),
+        containsInAnyOrder(
+            ID,
+            VALUE,
+            NAME,
+            TIMESTAMP_VARIANT1,
+            TIMESTAMP_VARIANT2,
+            TIMESTAMP_VARIANT3,
+            TIMESTAMP_VARIANT4,
+            VALID,
+            BINARY));
   }
 
   @Test
@@ -231,7 +331,7 @@ public class BigQueryUtilsTest {
     TableRow row = toTableRow().apply(FLAT_ROW);
     System.out.println(row);
 
-    assertThat(row.size(), equalTo(6));
+    assertThat(row.size(), equalTo(9));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
@@ -253,9 +353,12 @@ public class BigQueryUtilsTest {
 
     assertThat(row.size(), equalTo(1));
     row = (TableRow) row.get("row");
-    assertThat(row.size(), equalTo(6));
+    assertThat(row.size(), equalTo(9));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
+    assertThat(row, hasEntry("value", "123.456"));
+    assertThat(row, hasEntry("value", "123.456"));
+    assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
     assertThat(row, hasEntry("valid", "false"));
     assertThat(row, hasEntry("binary", "ABCD1234"));
@@ -267,7 +370,7 @@ public class BigQueryUtilsTest {
 
     assertThat(row.size(), equalTo(1));
     row = ((List<TableRow>) row.get("rows")).get(0);
-    assertThat(row.size(), equalTo(6));
+    assertThat(row.size(), equalTo(9));
     assertThat(row, hasEntry("id", "123"));
     assertThat(row, hasEntry("value", "123.456"));
     assertThat(row, hasEntry("name", "test"));
@@ -279,11 +382,14 @@ public class BigQueryUtilsTest {
   public void testToTableRow_null_row() {
     TableRow row = toTableRow().apply(NULL_FLAT_ROW);
 
-    assertThat(row.size(), equalTo(6));
+    assertThat(row.size(), equalTo(9));
     assertThat(row, hasEntry("id", null));
     assertThat(row, hasEntry("value", null));
     assertThat(row, hasEntry("name", null));
-    assertThat(row, hasEntry("timestamp", null));
+    assertThat(row, hasEntry("timestamp_variant1", null));
+    assertThat(row, hasEntry("timestamp_variant2", null));
+    assertThat(row, hasEntry("timestamp_variant3", null));
+    assertThat(row, hasEntry("timestamp_variant4", null));
     assertThat(row, hasEntry("valid", null));
     assertThat(row, hasEntry("binary", null));
   }

Reply via email to