This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d94e348 [BEAM-7999] Fix BigQuery timestamp handling for Schema Aware
PCollection
new 5a62f94 Merge pull request #9369 from
alexvanboxel/feature/BEAM-7999-bq-timestamp
d94e348 is described below
commit d94e3485da50b9dba09b77928d61c9f9ebad7597
Author: Alex Van Boxel <[email protected]>
AuthorDate: Sat Aug 17 15:17:35 2019 +0200
[BEAM-7999] Fix BigQuery timestamp handling for Schema Aware PCollection
This fixes the handling of timestamps when reading for BigQuery. As it
was assumed BigQuery always outputs a Double, reading a table directly
outputs a textual version in the JSON. This is now handled. Also the
from Beam Row also outputs the BigQuery textual version, iso the ISO
standard representation, this makes the implementation symmetric.
---
.../beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 68 ++++++++--
.../sdk/io/gcp/bigquery/BigQueryUtilsTest.java | 140 ++++++++++++++++++---
2 files changed, 183 insertions(+), 25 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 011f0c4..04a92ec 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -49,6 +49,7 @@ import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.chrono.ISOChronology;
@@ -90,6 +91,51 @@ public class BigQueryUtils {
}
}
+ private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PRINTER;
+
+ /**
+ * Native BigQuery formatter for it's timestamp format, depending on the
milliseconds stored in
+ * the column, the milli second part will be 6, 3 or absent. Example {@code
2019-08-16
+ * 00:52:07[.123]|[.123456] UTC}
+ */
+ private static final DateTimeFormatter BIGQUERY_TIMESTAMP_PARSER;
+
+ static {
+ DateTimeFormatter dateTimePart =
+ new DateTimeFormatterBuilder()
+ .appendYear(4, 4)
+ .appendLiteral('-')
+ .appendMonthOfYear(2)
+ .appendLiteral('-')
+ .appendDayOfMonth(2)
+ .appendLiteral(' ')
+ .appendHourOfDay(2)
+ .appendLiteral(':')
+ .appendMinuteOfHour(2)
+ .appendLiteral(':')
+ .appendSecondOfMinute(2)
+ .toFormatter()
+ .withZoneUTC();
+ BIGQUERY_TIMESTAMP_PARSER =
+ new DateTimeFormatterBuilder()
+ .append(dateTimePart)
+ .appendOptional(
+ new DateTimeFormatterBuilder()
+ .appendLiteral('.')
+ .appendFractionOfSecond(3, 6)
+ .toParser())
+ .appendLiteral(" UTC")
+ .toFormatter()
+ .withZoneUTC();
+ BIGQUERY_TIMESTAMP_PRINTER =
+ new DateTimeFormatterBuilder()
+ .append(dateTimePart)
+ .appendLiteral('.')
+ .appendFractionOfSecond(3, 3)
+ .appendLiteral(" UTC")
+ .toFormatter();
+ }
+
private static final Map<TypeName, StandardSQLTypeName>
BEAM_TO_BIGQUERY_TYPE_MAPPING =
ImmutableMap.<TypeName, StandardSQLTypeName>builder()
.put(TypeName.BYTE, StandardSQLTypeName.INT64)
@@ -120,9 +166,17 @@ public class BigQueryUtils {
.put(TypeName.STRING, str -> str)
.put(
TypeName.DATETIME,
- str ->
- new DateTime(
- (long) (Double.parseDouble(str) * 1000),
ISOChronology.getInstanceUTC()))
+ str -> {
+ if (str == null || str.length() == 0) {
+ return null;
+ }
+ if (str.endsWith("UTC")) {
+ return
BIGQUERY_TIMESTAMP_PARSER.parseDateTime(str).toDateTime(DateTimeZone.UTC);
+ } else {
+ return new DateTime(
+ (long) (Double.parseDouble(str) * 1000),
ISOChronology.getInstanceUTC());
+ }
+ })
.put(TypeName.BYTES, str -> BaseEncoding.base64().decode(str))
.build();
@@ -367,11 +421,9 @@ public class BigQueryUtils {
return toTableRow((Row) fieldValue);
case DATETIME:
- DateTimeFormatter patternFormat =
- new DateTimeFormatterBuilder()
- .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")
- .toFormatter();
- return ((Instant) fieldValue).toDateTime().toString(patternFormat);
+ return ((Instant) fieldValue)
+ .toDateTime(DateTimeZone.UTC)
+ .toString(BIGQUERY_TIMESTAMP_PRINTER);
case INT16:
case INT32:
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 67d997e..a215635 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.joda.time.Instant;
import org.joda.time.chrono.ISOChronology;
+import org.joda.time.format.ISODateTimeFormat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -55,7 +56,10 @@ public class BigQueryUtilsTest {
.addNullableField("id", Schema.FieldType.INT64)
.addNullableField("value", Schema.FieldType.DOUBLE)
.addNullableField("name", Schema.FieldType.STRING)
- .addNullableField("timestamp", Schema.FieldType.DATETIME)
+ .addNullableField("timestamp_variant1", Schema.FieldType.DATETIME)
+ .addNullableField("timestamp_variant2", Schema.FieldType.DATETIME)
+ .addNullableField("timestamp_variant3", Schema.FieldType.DATETIME)
+ .addNullableField("timestamp_variant4", Schema.FieldType.DATETIME)
.addNullableField("valid", Schema.FieldType.BOOLEAN)
.addNullableField("binary", Schema.FieldType.BYTES)
.build();
@@ -78,8 +82,22 @@ public class BigQueryUtilsTest {
private static final TableFieldSchema NAME =
new
TableFieldSchema().setName("name").setType(StandardSQLTypeName.STRING.toString());
- private static final TableFieldSchema TIMESTAMP =
- new
TableFieldSchema().setName("timestamp").setType(StandardSQLTypeName.TIMESTAMP.toString());
+ private static final TableFieldSchema TIMESTAMP_VARIANT1 =
+ new TableFieldSchema()
+ .setName("timestamp_variant1")
+ .setType(StandardSQLTypeName.TIMESTAMP.toString());
+ private static final TableFieldSchema TIMESTAMP_VARIANT2 =
+ new TableFieldSchema()
+ .setName("timestamp_variant2")
+ .setType(StandardSQLTypeName.TIMESTAMP.toString());
+ private static final TableFieldSchema TIMESTAMP_VARIANT3 =
+ new TableFieldSchema()
+ .setName("timestamp_variant3")
+ .setType(StandardSQLTypeName.TIMESTAMP.toString());
+ private static final TableFieldSchema TIMESTAMP_VARIANT4 =
+ new TableFieldSchema()
+ .setName("timestamp_variant4")
+ .setType(StandardSQLTypeName.TIMESTAMP.toString());
private static final TableFieldSchema VALID =
new
TableFieldSchema().setName("valid").setType(StandardSQLTypeName.BOOL.toString());
@@ -98,14 +116,34 @@ public class BigQueryUtilsTest {
.setName("row")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.NULLABLE.toString())
- .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID, BINARY));
+ .setFields(
+ Arrays.asList(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
private static final TableFieldSchema ROWS =
new TableFieldSchema()
.setName("rows")
.setType(StandardSQLTypeName.STRUCT.toString())
.setMode(Mode.REPEATED.toString())
- .setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP, VALID, BINARY));
+ .setFields(
+ Arrays.asList(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
// Make sure that chosen BYTES test value is the same after a full base64
round trip.
private static final Row FLAT_ROW =
@@ -114,6 +152,15 @@ public class BigQueryUtilsTest {
123L,
123.456,
"test",
+ ISODateTimeFormat.dateHourMinuteSecondFraction()
+ .withZoneUTC()
+ .parseDateTime("2019-08-16T13:52:07.000"),
+ ISODateTimeFormat.dateHourMinuteSecondFraction()
+ .withZoneUTC()
+ .parseDateTime("2019-08-17T14:52:07.123"),
+ ISODateTimeFormat.dateHourMinuteSecondFraction()
+ .withZoneUTC()
+ .parseDateTime("2019-08-18T15:52:07.123"),
new DateTime(123456),
false,
Base64.getDecoder().decode("ABCD1234"))
@@ -124,22 +171,31 @@ public class BigQueryUtilsTest {
.set("id", "123")
.set("value", "123.456")
.set("name", "test")
+ .set("timestamp_variant1", "2019-08-16 13:52:07 UTC")
+ .set("timestamp_variant2", "2019-08-17 14:52:07.123 UTC")
+ // we'll loose precession, but it's something BigQuery can output!
+ .set("timestamp_variant3", "2019-08-18 15:52:07.123456 UTC")
.set(
- "timestamp",
+ "timestamp_variant4",
String.valueOf(
new DateTime(123456L,
ISOChronology.getInstanceUTC()).getMillis() / 1000.0D))
.set("valid", "false")
.set("binary", "ABCD1234");
private static final Row NULL_FLAT_ROW =
- Row.withSchema(FLAT_TYPE).addValues(null, null, null, null, null,
null).build();
+ Row.withSchema(FLAT_TYPE)
+ .addValues(null, null, null, null, null, null, null, null, null)
+ .build();
private static final TableRow BQ_NULL_FLAT_ROW =
new TableRow()
.set("id", null)
.set("value", null)
.set("name", null)
- .set("timestamp", null)
+ .set("timestamp_variant1", null)
+ .set("timestamp_variant2", null)
+ .set("timestamp_variant3", null)
+ .set("timestamp_variant4", null)
.set("valid", null)
.set("binary", null);
@@ -165,7 +221,18 @@ public class BigQueryUtilsTest {
.set("rows", Collections.singletonList(Collections.singletonMap("v",
BQ_FLAT_ROW)));
private static final TableSchema BQ_FLAT_TYPE =
- new TableSchema().setFields(Arrays.asList(ID, VALUE, NAME, TIMESTAMP,
VALID, BINARY));
+ new TableSchema()
+ .setFields(
+ Arrays.asList(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
private static final TableSchema BQ_ARRAY_TYPE = new
TableSchema().setFields(Arrays.asList(IDS));
@@ -192,7 +259,18 @@ public class BigQueryUtilsTest {
public void testToTableSchema_flat() {
TableSchema schema = toTableSchema(FLAT_TYPE);
- assertThat(schema.getFields(), containsInAnyOrder(ID, VALUE, NAME,
TIMESTAMP, VALID, BINARY));
+ assertThat(
+ schema.getFields(),
+ containsInAnyOrder(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
}
@Test
@@ -211,7 +289,18 @@ public class BigQueryUtilsTest {
assertThat(field.getName(), equalTo("row"));
assertThat(field.getType(),
equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), nullValue());
- assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME,
TIMESTAMP, VALID, BINARY));
+ assertThat(
+ field.getFields(),
+ containsInAnyOrder(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
}
@Test
@@ -223,7 +312,18 @@ public class BigQueryUtilsTest {
assertThat(field.getName(), equalTo("rows"));
assertThat(field.getType(),
equalTo(StandardSQLTypeName.STRUCT.toString()));
assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
- assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME,
TIMESTAMP, VALID, BINARY));
+ assertThat(
+ field.getFields(),
+ containsInAnyOrder(
+ ID,
+ VALUE,
+ NAME,
+ TIMESTAMP_VARIANT1,
+ TIMESTAMP_VARIANT2,
+ TIMESTAMP_VARIANT3,
+ TIMESTAMP_VARIANT4,
+ VALID,
+ BINARY));
}
@Test
@@ -231,7 +331,7 @@ public class BigQueryUtilsTest {
TableRow row = toTableRow().apply(FLAT_ROW);
System.out.println(row);
- assertThat(row.size(), equalTo(6));
+ assertThat(row.size(), equalTo(9));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
@@ -253,9 +353,12 @@ public class BigQueryUtilsTest {
assertThat(row.size(), equalTo(1));
row = (TableRow) row.get("row");
- assertThat(row.size(), equalTo(6));
+ assertThat(row.size(), equalTo(9));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
+ assertThat(row, hasEntry("value", "123.456"));
+ assertThat(row, hasEntry("value", "123.456"));
+ assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
assertThat(row, hasEntry("valid", "false"));
assertThat(row, hasEntry("binary", "ABCD1234"));
@@ -267,7 +370,7 @@ public class BigQueryUtilsTest {
assertThat(row.size(), equalTo(1));
row = ((List<TableRow>) row.get("rows")).get(0);
- assertThat(row.size(), equalTo(6));
+ assertThat(row.size(), equalTo(9));
assertThat(row, hasEntry("id", "123"));
assertThat(row, hasEntry("value", "123.456"));
assertThat(row, hasEntry("name", "test"));
@@ -279,11 +382,14 @@ public class BigQueryUtilsTest {
public void testToTableRow_null_row() {
TableRow row = toTableRow().apply(NULL_FLAT_ROW);
- assertThat(row.size(), equalTo(6));
+ assertThat(row.size(), equalTo(9));
assertThat(row, hasEntry("id", null));
assertThat(row, hasEntry("value", null));
assertThat(row, hasEntry("name", null));
- assertThat(row, hasEntry("timestamp", null));
+ assertThat(row, hasEntry("timestamp_variant1", null));
+ assertThat(row, hasEntry("timestamp_variant2", null));
+ assertThat(row, hasEntry("timestamp_variant3", null));
+ assertThat(row, hasEntry("timestamp_variant4", null));
assertThat(row, hasEntry("valid", null));
assertThat(row, hasEntry("binary", null));
}