This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ff204  [BEAM-6718] Fix BigQuery SQL postcommit (#7904)
76ff204 is described below

commit 76ff2042cf92b9c41908b4c06871f340acdb4208
Author: reuvenlax <[email protected]>
AuthorDate: Sun Feb 24 19:52:26 2019 -0800

    [BEAM-6718] Fix BigQuery SQL postcommit (#7904)
    
    * Fix broken BigQuery logic
    * Add support for CHAR type
    * BigQuery AvroUtils must know about logical types
---
 .../org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java | 18 ++++++++++++++++++
 .../apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java |  9 +++------
 2 files changed, 21 insertions(+), 6 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
index b58aa87..91b3cb9 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroUtils.java
@@ -19,12 +19,21 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
 import org.joda.time.Instant;
 
 /** Utils to help convert Apache Avro types to Beam types. */
 public class AvroUtils {
+  // TODO: BigQuery shouldn't know about SQL internal logical types.
+  private static final Set<String> SQL_DATE_TIME_TYPES =
+      ImmutableSet.of(
+          "SqlDateType", "SqlTimeType", "SqlTimeWithLocalTzType", 
"SqlTimestampWithLocalTzType");
+  private static final Set<String> SQL_STRING_TYPES = 
ImmutableSet.of("SqlCharType");
+
+  /** Tries to convert an Avro field to Beam field based on the target type of 
the Beam field. */
   public static Object convertAvroFormat(Field beamField, Object value) {
     Object ret;
     TypeName beamFieldTypeName = beamField.getType().getTypeName();
@@ -48,6 +57,15 @@ public class AvroUtils {
       case ARRAY:
         ret = convertAvroArray(beamField, value);
         break;
+      case LOGICAL_TYPE:
+        String identifier = 
beamField.getType().getLogicalType().getIdentifier();
+        if (SQL_DATE_TIME_TYPES.contains(identifier)) {
+          return new Instant().withMillis(((long) value) / 1000);
+        } else if (SQL_STRING_TYPES.contains(identifier)) {
+          return convertAvroPrimitiveTypes(TypeName.STRING, value);
+        } else {
+          throw new RuntimeException("Unknown logical type " + identifier);
+        }
       case DECIMAL:
         throw new RuntimeException("Does not support converting DECIMAL type 
value");
       case MAP:
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 62be466..ceab03a 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -103,8 +103,8 @@ public class BigQueryUtils {
           .put("SqlDateType", StandardSQLTypeName.DATE)
           .put("SqlTimeType", StandardSQLTypeName.TIME)
           .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)
-          .put("SqlTimestamp", StandardSQLTypeName.TIMESTAMP)
           .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)
+          .put("SqlCharType", StandardSQLTypeName.STRING)
           .build();
 
   /**
@@ -112,17 +112,14 @@ public class BigQueryUtils {
    * FieldType}.
    */
   private static StandardSQLTypeName toStandardSQLTypeName(FieldType 
fieldType) {
-    StandardSQLTypeName sqlType = 
BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
-
-    if (sqlType == StandardSQLTypeName.TIMESTAMP && 
fieldType.getTypeName().isLogicalType()) {
+    if (fieldType.getTypeName().isLogicalType()) {
       StandardSQLTypeName foundType =
           
BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());
       if (foundType != null) {
         return foundType;
       }
     }
-
-    return sqlType;
+    return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
   }
 
   private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {

Reply via email to