This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 76ff204 [BEAM-6718] Fix BigQuery SQL postcommit (#7904)
76ff204 is described below
commit 76ff2042cf92b9c41908b4c06871f340acdb4208
Author: reuvenlax <[email protected]>
AuthorDate: Sun Feb 24 19:52:26 2019 -0800
[BEAM-6718] Fix BigQuery SQL postcommit (#7904)
* Fix broken BigQuery logic
* Add support for CHAR type
* BigQuery AvroUtils must know about logical types
---
.../org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java | 18 ++++++++++++++++++
.../apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java | 9 +++------
2 files changed, 21 insertions(+), 6 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
index b58aa87..91b3cb9 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
@@ -19,12 +19,21 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.TypeName;
+import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
/** Utils to help convert Apache Avro types to Beam types. */
public class AvroUtils {
+ // TODO: BigQuery shouldn't know about SQL internal logical types.
+ private static final Set<String> SQL_DATE_TIME_TYPES =
+ ImmutableSet.of(
+ "SqlDateType", "SqlTimeType", "SqlTimeWithLocalTzType",
"SqlTimestampWithLocalTzType");
+ private static final Set<String> SQL_STRING_TYPES =
ImmutableSet.of("SqlCharType");
+
+ /** Tries to convert an Avro field to Beam field based on the target type of
the Beam field. */
public static Object convertAvroFormat(Field beamField, Object value) {
Object ret;
TypeName beamFieldTypeName = beamField.getType().getTypeName();
@@ -48,6 +57,15 @@ public class AvroUtils {
case ARRAY:
ret = convertAvroArray(beamField, value);
break;
+ case LOGICAL_TYPE:
+ String identifier =
beamField.getType().getLogicalType().getIdentifier();
+ if (SQL_DATE_TIME_TYPES.contains(identifier)) {
+ return new Instant().withMillis(((long) value) / 1000);
+ } else if (SQL_STRING_TYPES.contains(identifier)) {
+ return convertAvroPrimitiveTypes(TypeName.STRING, value);
+ } else {
+ throw new RuntimeException("Unknown logical type " + identifier);
+ }
case DECIMAL:
throw new RuntimeException("Does not support converting DECIMAL type
value");
case MAP:
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 62be466..ceab03a 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -103,8 +103,8 @@ public class BigQueryUtils {
.put("SqlDateType", StandardSQLTypeName.DATE)
.put("SqlTimeType", StandardSQLTypeName.TIME)
.put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
- .put("SqlTimestamp", StandardSQLTypeName.TIMESTAMP)
.put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)
+ .put("SqlCharType", StandardSQLTypeName.STRING)
.build();
/**
@@ -112,17 +112,14 @@ public class BigQueryUtils {
* FieldType}.
*/
private static StandardSQLTypeName toStandardSQLTypeName(FieldType
fieldType) {
- StandardSQLTypeName sqlType =
BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
-
- if (sqlType == StandardSQLTypeName.TIMESTAMP &&
fieldType.getTypeName().isLogicalType()) {
+ if (fieldType.getTypeName().isLogicalType()) {
StandardSQLTypeName foundType =
BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());
if (foundType != null) {
return foundType;
}
}
-
- return sqlType;
+ return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
}
private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {