[ 
https://issues.apache.org/jira/browse/BEAM-3983?focusedWorklogId=94817&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94817
 ]

ASF GitHub Bot logged work on BEAM-3983:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 22:15
            Start Date: 24/Apr/18 22:15
    Worklog Time Spent: 10m 
      Work Description: kennknowles closed pull request #4947: [BEAM-3983] Add 
utils for converting to BigQuery types
URL: https://github.com/apache/beam/pull/4947
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build_rules.gradle b/build_rules.gradle
index a00643df153..4d723598e8e 100644
--- a/build_rules.gradle
+++ b/build_rules.gradle
@@ -225,6 +225,7 @@ ext.library = [
     google_api_services_storage: 
"com.google.apis:google-api-services-storage:v1-rev124-$google_clients_version",
     google_auth_library_credentials: 
"com.google.auth:google-auth-library-credentials:$google_auth_version",
     google_auth_library_oauth2_http: 
"com.google.auth:google-auth-library-oauth2-http:$google_auth_version",
+    google_cloud_bigquery: 
"com.google.cloud:google-cloud-bigquery:$google_clients_version",
     google_cloud_core: "com.google.cloud:google-cloud-core:1.0.2",
     google_cloud_core_grpc: 
"com.google.cloud:google-cloud-core-grpc:$grpc_version",
     google_cloud_dataflow_java_proto_library_all: 
"com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
diff --git a/pom.xml b/pom.xml
index 3e42a31f53f..332d001c412 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1119,6 +1119,12 @@
         </exclusions>
       </dependency>
 
+      <dependency>
+        <groupId>com.google.cloud</groupId>
+        <artifactId>google-cloud-bigquery</artifactId>
+        <version>${google-clients.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>com.google.cloud</groupId>
         <artifactId>google-cloud-core-grpc</artifactId>
diff --git a/sdks/java/io/google-cloud-platform/build.gradle 
b/sdks/java/io/google-cloud-platform/build.gradle
index 66f44f75215..d41a8ca2750 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -40,6 +40,7 @@ dependencies {
   shadow library.java.jackson_databind
   shadow library.java.grpc_core
   shadow library.java.google_api_services_bigquery
+  shadow library.java.google_cloud_bigquery
   shadow library.java.gax_grpc
   shadow library.java.google_cloud_core_grpc
   shadow library.java.google_api_services_pubsub
diff --git a/sdks/java/io/google-cloud-platform/pom.xml 
b/sdks/java/io/google-cloud-platform/pom.xml
index c315cff0823..0cf3180b429 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -187,6 +187,11 @@
       <artifactId>google-api-services-bigquery</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.cloud</groupId>
+      <artifactId>google-cloud-bigquery</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>com.google.api</groupId>
       <artifactId>gax-grpc</artifactId>
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
new file mode 100644
index 00000000000..ba47dbcca01
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.Field.Mode;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * Utility methods for BigQuery related operations.
+ *
+ * <p><b>Example: Writing to BigQuery</b>
+ *
+ * <pre>{@code
+ * PCollection<Row> rows = ...;
+ *
+ * rows.apply(BigQueryIO.<Row>write()
+ *       .withSchema(BigQueryUtils.toTableSchema(rows))
+ *       .withFormatFunction(BigQueryUtils.toTableRow())
+ *       .to("my-project:my_dataset.my_table"));
+ * }</pre>
+ */
+public class BigQueryUtils {
+  private static final Map<TypeName, StandardSQLTypeName> 
BEAM_TO_BIGQUERY_TYPE_MAPPING =
+      ImmutableMap.<TypeName, StandardSQLTypeName>builder()
+          .put(TypeName.BYTE, StandardSQLTypeName.INT64)
+          .put(TypeName.INT16, StandardSQLTypeName.INT64)
+          .put(TypeName.INT32, StandardSQLTypeName.INT64)
+          .put(TypeName.INT64, StandardSQLTypeName.INT64)
+
+          .put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64)
+          .put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64)
+
+          .put(TypeName.DECIMAL, StandardSQLTypeName.FLOAT64)
+
+          .put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL)
+
+          .put(TypeName.ARRAY, StandardSQLTypeName.ARRAY)
+          .put(TypeName.ROW, StandardSQLTypeName.STRUCT)
+
+          .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)
+          .put(TypeName.STRING, StandardSQLTypeName.STRING)
+
+          .build();
+
+  private static final Map<byte[], StandardSQLTypeName> 
BEAM_TO_BIGQUERY_METADATA_MAPPING =
+      ImmutableMap.<byte[], StandardSQLTypeName>builder()
+          .put("DATE".getBytes(), StandardSQLTypeName.DATE)
+          .put("TIME".getBytes(), StandardSQLTypeName.TIME)
+          .put("TIME_WITH_LOCAL_TZ".getBytes(), StandardSQLTypeName.TIME)
+          .put("TS".getBytes(), StandardSQLTypeName.TIMESTAMP)
+          .put("TS_WITH_LOCAL_TZ".getBytes(), StandardSQLTypeName.TIMESTAMP)
+          .build();
+
+  /**
+   * Get the corresponding BigQuery {@link StandardSQLTypeName}
+   * for supported Beam {@link FieldType}.
+   */
+  private static StandardSQLTypeName toStandardSQLTypeName(FieldType 
fieldType) {
+    StandardSQLTypeName sqlType = 
BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());
+
+    if (sqlType == StandardSQLTypeName.TIMESTAMP && fieldType.getMetadata() != 
null) {
+      sqlType = BEAM_TO_BIGQUERY_METADATA_MAPPING.get(fieldType.getMetadata());
+    }
+
+    return sqlType;
+  }
+
+  private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {
+    List<TableFieldSchema> fields = new 
ArrayList<TableFieldSchema>(schema.getFieldCount());
+    for (Field schemaField : schema.getFields()) {
+      FieldType type = schemaField.getType();
+
+      TableFieldSchema field = new TableFieldSchema()
+        .setName(schemaField.getName());
+      if (schemaField.getDescription() != null && 
!schemaField.getDescription().equals("")) {
+        field.setDescription(schemaField.getDescription());
+      }
+
+      if (!schemaField.getNullable()) {
+        field.setMode(Mode.REQUIRED.toString());
+      }
+      if (TypeName.ARRAY == type.getTypeName()) {
+        type = type.getCollectionElementType();
+        field.setMode(Mode.REPEATED.toString());
+      }
+      if (TypeName.ROW == type.getTypeName()) {
+        Schema subType = type.getRowSchema();
+        field.setFields(toTableFieldSchema(subType));
+      }
+      field.setType(toStandardSQLTypeName(type).toString());
+
+      fields.add(field);
+    }
+    return fields;
+  }
+
+  /**
+   * Convert a Beam {@link Schema} to a BigQuery {@link TableSchema}.
+   */
+  public static TableSchema toTableSchema(Schema schema) {
+    return new TableSchema().setFields(toTableFieldSchema(schema));
+  }
+
+  /**
+   * Convert a Beam {@link PCollection} to a BigQuery {@link TableSchema}.
+   */
+  public static TableSchema toTableSchema(PCollection<Row> rows) {
+    RowCoder coder = (RowCoder) rows.getCoder();
+    return toTableSchema(coder.getSchema());
+  }
+
+  private static final SerializableFunction<Row, TableRow> TO_TABLE_ROW = new 
ToTableRow();
+
+  /**
+   * Convert a Beam {@link Row} to a BigQuery {@link TableRow}.
+   */
+  public static SerializableFunction<Row, TableRow> toTableRow() {
+    return TO_TABLE_ROW;
+  }
+
+  /**
+   * Convert a Beam {@link Row} to a BigQuery {@link TableRow}.
+   */
+  private static class ToTableRow implements SerializableFunction<Row, 
TableRow> {
+    @Override
+    public TableRow apply(Row input) {
+      TableRow output = new TableRow();
+      for (int i = 0; i < input.getFieldCount(); i++) {
+        Object value = input.getValue(i);
+
+        Field schemaField = input.getSchema().getField(i);
+        TypeName type = schemaField.getType().getTypeName();
+        if (TypeName.ARRAY == type) {
+          type = 
schemaField.getType().getCollectionElementType().getTypeName();
+          if (TypeName.ROW == type) {
+            List<Row> rows = (List<Row>) value;
+            List<TableRow> tableRows = new ArrayList<TableRow>(rows.size());
+            for (int j = 0; j < rows.size(); j++) {
+              tableRows.add(apply(rows.get(j)));
+            }
+            value = tableRows;
+          }
+        } else if (TypeName.ROW == type) {
+          value = apply((Row) value);
+        }
+
+        output = output.set(
+            schemaField.getName(),
+            value);
+      }
+      return output;
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
new file mode 100644
index 00000000000..150187f6267
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableRow;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.toTableSchema;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.collection.IsMapContaining.hasEntry;
+import static org.junit.Assert.assertThat;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.Field.Mode;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+/**
+ * Tests for {@link BigQueryUtils}.
+ */
+public class BigQueryUtilsTest {
+  private static final Schema FLAT_TYPE = Schema
+      .builder()
+      .addInt64Field("id", true)
+      .addDoubleField("value", true)
+      .addStringField("name", true)
+      .addDateTimeField("timestamp", true)
+      .addBooleanField("valid", true)
+      .build();
+
+  private static final Schema ARRAY_TYPE = Schema
+      .builder()
+      .addArrayField("ids", Schema.TypeName.INT64.type())
+      .build();
+
+  private static final Schema ROW_TYPE = Schema
+      .builder()
+      .addRowField("row", FLAT_TYPE, true)
+      .build();
+
+  private static final Schema ARRAY_ROW_TYPE = Schema
+      .builder()
+      .addArrayField("rows", Schema.FieldType
+          .of(Schema.TypeName.ROW)
+          .withRowSchema(FLAT_TYPE))
+      .build();
+
+  private static final TableFieldSchema ID =
+      new TableFieldSchema().setName("id")
+        .setType(StandardSQLTypeName.INT64.toString());
+
+  private static final TableFieldSchema VALUE =
+      new TableFieldSchema().setName("value")
+        .setType(StandardSQLTypeName.FLOAT64.toString());
+
+  private static final TableFieldSchema NAME =
+      new TableFieldSchema().setName("name")
+        .setType(StandardSQLTypeName.STRING.toString());
+
+  private static final TableFieldSchema TIMESTAMP =
+      new TableFieldSchema().setName("timestamp")
+        .setType(StandardSQLTypeName.TIMESTAMP.toString());
+
+  private static final TableFieldSchema VALID =
+      new TableFieldSchema().setName("valid")
+        .setType(StandardSQLTypeName.BOOL.toString());
+
+  private static final TableFieldSchema IDS =
+      new TableFieldSchema().setName("ids")
+        .setType(StandardSQLTypeName.INT64.toString())
+        .setMode(Mode.REPEATED.toString());
+
+  private static final Row FLAT_ROW =
+      Row
+        .withSchema(FLAT_TYPE)
+        .addValues(123L, 123.456, "test", new DateTime(123456), false)
+        .build();
+
+  private static final Row ARRAY_ROW =
+      Row
+        .withSchema(ARRAY_TYPE)
+        .addValues((Object) Arrays.asList(123L, 124L))
+        .build();
+
+  private static final Row ROW_ROW =
+      Row
+        .withSchema(ROW_TYPE)
+        .addValues(FLAT_ROW)
+        .build();
+
+  private static final Row ARRAY_ROW_ROW =
+      Row
+        .withSchema(ARRAY_ROW_TYPE)
+        .addValues((Object) Arrays.asList(FLAT_ROW))
+        .build();
+
+  @Test public void testToTableSchema_flat() {
+    TableSchema schema = toTableSchema(FLAT_TYPE);
+
+    assertThat(schema.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableSchema_array() {
+    TableSchema schema = toTableSchema(ARRAY_TYPE);
+
+    assertThat(schema.getFields(), contains(IDS));
+  }
+
+  @Test public void testToTableSchema_row() {
+    TableSchema schema = toTableSchema(ROW_TYPE);
+
+    assertThat(schema.getFields().size(), equalTo(1));
+    TableFieldSchema field = schema.getFields().get(0);
+    assertThat(field.getName(), equalTo("row"));
+    assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
+    assertThat(field.getMode(), nullValue());
+    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableSchema_array_row() {
+    TableSchema schema = toTableSchema(ARRAY_ROW_TYPE);
+
+    assertThat(schema.getFields().size(), equalTo(1));
+    TableFieldSchema field = schema.getFields().get(0);
+    assertThat(field.getName(), equalTo("rows"));
+    assertThat(field.getType(), 
equalTo(StandardSQLTypeName.STRUCT.toString()));
+    assertThat(field.getMode(), equalTo(Mode.REPEATED.toString()));
+    assertThat(field.getFields(), containsInAnyOrder(ID, VALUE, NAME, 
TIMESTAMP, VALID));
+  }
+
+  @Test public void testToTableRow_flat() {
+    TableRow row = toTableRow().apply(FLAT_ROW);
+
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+
+  @Test public void testToTableRow_array() {
+    TableRow row = toTableRow().apply(ARRAY_ROW);
+
+    assertThat(row, hasEntry("ids", Arrays.asList(123L, 124L)));
+    assertThat(row.size(), equalTo(1));
+  }
+
+  @Test public void testToTableRow_row() {
+    TableRow row = toTableRow().apply(ROW_ROW);
+
+    assertThat(row.size(), equalTo(1));
+    row = (TableRow) row.get("row");
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+
+  @Test public void testToTableRow_array_row() {
+    TableRow row = toTableRow().apply(ARRAY_ROW_ROW);
+
+    assertThat(row.size(), equalTo(1));
+    row = ((List<TableRow>) row.get("rows")).get(0);
+    assertThat(row.size(), equalTo(5));
+    assertThat(row, hasEntry("id", 123L));
+    assertThat(row, hasEntry("value", 123.456));
+    assertThat(row, hasEntry("name", "test"));
+    assertThat(row, hasEntry("valid", false));
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94817)
    Time Spent: 10h 40m  (was: 10.5h)

> BigQuery writes from pure SQL
> -----------------------------
>
>                 Key: BEAM-3983
>                 URL: https://issues.apache.org/jira/browse/BEAM-3983
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Andrew Pilloud
>            Assignee: Andrew Pilloud
>            Priority: Major
>          Time Spent: 10h 40m
>  Remaining Estimate: 0h
>
> It would be nice if you could write to BigQuery in SQL without writing any 
> java code. For example:
> {code:java}
> INSERT INTO bigquery SELECT * FROM PCOLLECTION{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to